summaryrefslogtreecommitdiff
path: root/Assistant/Threads
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-10-13 17:14:56 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-10-13 17:14:56 -0400
commit7e226e373b203dce324b9d3b7fc2a82ca1781183 (patch)
tree50cf3d083441c4a99d4e46f95a66421e1e1d2a4a /Assistant/Threads
parenta0865e6c494ec4bdfb7ab058a5633a8634d48bd4 (diff)
cronner: run jobs triggered by remotes becoming connected (untested)
Diffstat (limited to 'Assistant/Threads')
-rw-r--r--Assistant/Threads/Cronner.hs106
1 files changed, 76 insertions, 30 deletions
diff --git a/Assistant/Threads/Cronner.hs b/Assistant/Threads/Cronner.hs
index a1f716882..8eb3d472e 100644
--- a/Assistant/Threads/Cronner.hs
+++ b/Assistant/Threads/Cronner.hs
@@ -34,6 +34,7 @@ import Assistant.WebApp.Types
import Git.Remote (RemoteName)
import Control.Concurrent.Async
+import Control.Concurrent.MVar
import Data.Time.LocalTime
import Data.Time.Clock
import qualified Data.Map as M
@@ -42,8 +43,13 @@ import qualified Control.Exception as E
import qualified Data.Text as T
{- Loads schedules for this repository, and fires off one thread for each
- - scheduled event. These threads sleep until the next time the event
- - should run.
+ - scheduled event that runs on this repository. Each thread sleeps until
+ - its event is scheduled to run.
+ -
+ - To handle events that run on remotes, which need to only run when
+ - their remote gets connected, threads are also started, and are passed
+ - a MVar to wait on, which is stored in the DaemonStatus's
+ - connectRemoteNotifiers.
-
- In the meantime the main thread waits for any changes to the
- schedules. When there's a change, compare the old and new list of
@@ -53,51 +59,63 @@ cronnerThread :: UrlRenderer -> NamedThread
cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do
dstatus <- getDaemonStatus
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
- go h M.empty
+ go h M.empty M.empty
where
- go h m = do
+ go h amap nmap = do
activities <- liftAnnex $ scheduleGet =<< getUUID
- let addedactivities = activities `S.difference` M.keysSet m
- let removedactivities = M.keysSet m `S.difference` activities
+ let addedactivities = activities `S.difference` M.keysSet amap
+ let removedactivities = M.keysSet amap `S.difference` activities
forM_ (S.toList removedactivities) $ \activity ->
- case M.lookup activity m of
+ case M.lookup activity amap of
Just a -> do
debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)]
liftIO $ cancel a
Nothing -> noop
lastruntimes <- liftAnnex getLastRunTimes
- addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes
+ started <- startactivities (S.toList addedactivities) lastruntimes
+ let addedamap = M.fromList $ map fst started
+ let addednmap = M.fromList $ catMaybes $ map snd started
+
+ let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities)
+ let amap' = M.difference (M.union addedamap amap) (removefiltered amap)
+ let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap)
+ modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') }
liftIO $ waitNotification h
debug ["reloading changed activities"]
-
- let m' = M.difference (M.union addedm m)
- (M.filterWithKey (\k _ -> S.member k removedactivities) m)
- go h m'
- startactivities as lastruntimes = forM as $ \activity -> do
- runner <- asIO2 (activityThread urlrenderer)
- a <- liftIO $ async $
- runner activity (M.lookup activity lastruntimes)
- return (activity, a)
+ go h amap' nmap'
+ startactivities as lastruntimes = forM as $ \activity ->
+ case connectActivityUUID activity of
+ Nothing -> do
+ runner <- asIO2 (sleepingActivityThread urlrenderer)
+ a <- liftIO $ async $
+ runner activity (M.lookup activity lastruntimes)
+ return ((activity, a), Nothing)
+ Just u -> do
+ mvar <- liftIO newEmptyMVar
+ runner <- asIO2 (remoteActivityThread urlrenderer mvar)
+ a <- liftIO $ async $
+ runner activity (M.lookup activity lastruntimes)
+ return ((activity, a), Just (activity, (u, [mvar])))
{- Calculate the next time the activity is scheduled to run, then
- sleep until that time, and run it. Then call setLastRunTime, and
- loop.
-}
-activityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
-activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
+sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
+sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
where
getnexttime = liftIO . nextTime schedule
go _ Nothing = debug ["no scheduled events left for", desc]
- go l (Just (NextTimeExactly t)) = runafter l t Nothing run
+ go l (Just (NextTimeExactly t)) = waitrun l t Nothing
go l (Just (NextTimeWindow windowstart windowend)) =
- runafter l windowstart (Just windowend) run
+ waitrun l windowstart (Just windowend)
desc = fromScheduledActivity activity
schedule = getSchedule activity
- runafter l t mmaxt a = do
+ waitrun l t mmaxt = do
seconds <- liftIO $ secondsUntilLocalTime t
when (seconds > Seconds 0) $ do
debug ["waiting", show seconds, "for next scheduled", desc]
@@ -109,7 +127,7 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt
then do
debug ["too late to run scheduled", desc]
go l =<< getnexttime l
- else a nowt
+ else run nowt
where
tolate nowt tz = case mmaxt of
Just maxt -> nowt > maxt
@@ -118,12 +136,31 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt
(localTimeToUTC tz nowt)
(localTimeToUTC tz t) > 600
run nowt = do
- debug ["starting", desc]
- runActivity urlrenderer activity
- debug ["finished", desc]
- liftAnnex $ setLastRunTime activity nowt
+ runActivity urlrenderer activity nowt
go (Just nowt) =<< getnexttime (Just nowt)
+{- Wait for the remote to become available by waiting on the MVar.
+ - Then check if the time is within a time window when activity
+ - is scheduled to run, and if so run it.
+ - Otherwise, just wait again on the MVar.
+ -}
+remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
+remoteActivityThread urlrenderer mvar activity lasttime = do
+ liftIO $ takeMVar mvar
+ go =<< liftIO (nextTime (getSchedule activity) lasttime)
+ where
+ go (Just (NextTimeWindow windowstart windowend)) = do
+ now <- liftIO getCurrentTime
+ tz <- liftIO $ getTimeZone now
+ if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend
+ then do
+ let nowt = utcToLocalTime tz now
+ runActivity urlrenderer activity nowt
+ loop (Just nowt)
+ else loop lasttime
+ go _ = noop -- running at exact time not handled here
+ loop = remoteActivityThread urlrenderer mvar activity
+
secondsUntilLocalTime :: LocalTime -> IO Seconds
secondsUntilLocalTime t = do
now <- getCurrentTime
@@ -133,15 +170,24 @@ secondsUntilLocalTime t = do
then Seconds secs
else Seconds 0
-runActivity :: UrlRenderer -> ScheduledActivity -> Assistant ()
-runActivity urlrenderer (ScheduledSelfFsck _ d) = do
+runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant ()
+runActivity urlrenderer activity nowt = do
+ debug ["starting", desc]
+ runActivity' urlrenderer activity
+ debug ["finished", desc]
+ liftAnnex $ setLastRunTime activity nowt
+ where
+ desc = fromScheduledActivity activity
+
+runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant ()
+runActivity' urlrenderer (ScheduledSelfFsck _ d) = do
program <- liftIO $ readProgramFile
void $ runFsck urlrenderer Nothing $
batchCommand program (Param "fsck" : fsckParams d)
mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir)
where
reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download
-runActivity urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u)
+runActivity' urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u)
where
go (Just r) = void $ case Remote.remoteFsck r of
Nothing -> void $ runFsck urlrenderer (Just $ Remote.name r) $ do