summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-10-13 17:14:56 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-10-13 17:14:56 -0400
commit7e226e373b203dce324b9d3b7fc2a82ca1781183 (patch)
tree50cf3d083441c4a99d4e46f95a66421e1e1d2a4a
parenta0865e6c494ec4bdfb7ab058a5633a8634d48bd4 (diff)
cronner: run jobs triggered by remotes becoming connected (untested)
-rw-r--r--Assistant/Sync.hs9
-rw-r--r--Assistant/Threads/Cronner.hs106
-rw-r--r--Assistant/Types/DaemonStatus.hs4
-rw-r--r--Types/ScheduledActivity.hs6
4 files changed, 94 insertions, 31 deletions
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index fe578ab43..fe4fda896 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -44,13 +44,17 @@ import Control.Concurrent
- they push to us. Since XMPP pushes run ansynchronously, any scan of the
- XMPP remotes has to be deferred until they're done pushing to us, so
- all XMPP remotes are marked as possibly desynced.
+ -
+ - Also handles signaling any connectRemoteNotifiers, after the syncing is
+ - done.
-}
reconnectRemotes :: Bool -> [Remote] -> Assistant ()
reconnectRemotes _ [] = noop
reconnectRemotes notifypushes rs = void $ do
modifyDaemonStatus_ $ \s -> s
{ desynced = S.union (S.fromList $ map Remote.uuid xmppremotes) (desynced s) }
- syncAction rs (const go)
+ void $ syncAction rs (const go)
+ mapM_ signal rs
where
gitremotes = filter (notspecialremote . Remote.repo) rs
(xmppremotes, nonxmppremotes) = partition isXMPPRemote rs
@@ -73,6 +77,9 @@ reconnectRemotes notifypushes rs = void $ do
filter (not . remoteAnnexIgnore . Remote.gitconfig)
nonxmppremotes
return failed
+ signal r = liftIO . mapM_ (flip tryPutMVar ())
+ =<< fromMaybe [] . M.lookup (Remote.uuid r) . connectRemoteNotifiers
+ <$> getDaemonStatus
{- Updates the local sync branch, then pushes it to all remotes, in
- parallel, along with the git-annex branch. This is the same
diff --git a/Assistant/Threads/Cronner.hs b/Assistant/Threads/Cronner.hs
index a1f716882..8eb3d472e 100644
--- a/Assistant/Threads/Cronner.hs
+++ b/Assistant/Threads/Cronner.hs
@@ -34,6 +34,7 @@ import Assistant.WebApp.Types
import Git.Remote (RemoteName)
import Control.Concurrent.Async
+import Control.Concurrent.MVar
import Data.Time.LocalTime
import Data.Time.Clock
import qualified Data.Map as M
@@ -42,8 +43,13 @@ import qualified Control.Exception as E
import qualified Data.Text as T
{- Loads schedules for this repository, and fires off one thread for each
- - scheduled event. These threads sleep until the next time the event
- - should run.
+ - scheduled event that runs on this repository. Each thread sleeps until
+ - its event is scheduled to run.
+ -
+ - To handle events that run on remotes, which need to only run when
+ - their remote gets connected, threads are also started, and are passed
+ - a MVar to wait on, which is stored in the DaemonStatus's
+ - connectRemoteNotifiers.
-
- In the meantime the main thread waits for any changes to the
- schedules. When there's a change, compare the old and new list of
@@ -53,51 +59,63 @@ cronnerThread :: UrlRenderer -> NamedThread
cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do
dstatus <- getDaemonStatus
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
- go h M.empty
+ go h M.empty M.empty
where
- go h m = do
+ go h amap nmap = do
activities <- liftAnnex $ scheduleGet =<< getUUID
- let addedactivities = activities `S.difference` M.keysSet m
- let removedactivities = M.keysSet m `S.difference` activities
+ let addedactivities = activities `S.difference` M.keysSet amap
+ let removedactivities = M.keysSet amap `S.difference` activities
forM_ (S.toList removedactivities) $ \activity ->
- case M.lookup activity m of
+ case M.lookup activity amap of
Just a -> do
debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)]
liftIO $ cancel a
Nothing -> noop
lastruntimes <- liftAnnex getLastRunTimes
- addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes
+ started <- startactivities (S.toList addedactivities) lastruntimes
+ let addedamap = M.fromList $ map fst started
+ let addednmap = M.fromList $ catMaybes $ map snd started
+
+ let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities)
+ let amap' = M.difference (M.union addedamap amap) (removefiltered amap)
+ let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap)
+ modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') }
liftIO $ waitNotification h
debug ["reloading changed activities"]
-
- let m' = M.difference (M.union addedm m)
- (M.filterWithKey (\k _ -> S.member k removedactivities) m)
- go h m'
- startactivities as lastruntimes = forM as $ \activity -> do
- runner <- asIO2 (activityThread urlrenderer)
- a <- liftIO $ async $
- runner activity (M.lookup activity lastruntimes)
- return (activity, a)
+ go h amap' nmap'
+ startactivities as lastruntimes = forM as $ \activity ->
+ case connectActivityUUID activity of
+ Nothing -> do
+ runner <- asIO2 (sleepingActivityThread urlrenderer)
+ a <- liftIO $ async $
+ runner activity (M.lookup activity lastruntimes)
+ return ((activity, a), Nothing)
+ Just u -> do
+ mvar <- liftIO newEmptyMVar
+ runner <- asIO2 (remoteActivityThread urlrenderer mvar)
+ a <- liftIO $ async $
+ runner activity (M.lookup activity lastruntimes)
+ return ((activity, a), Just (activity, (u, [mvar])))
{- Calculate the next time the activity is scheduled to run, then
- sleep until that time, and run it. Then call setLastRunTime, and
- loop.
-}
-activityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
-activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
+sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
+sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
where
getnexttime = liftIO . nextTime schedule
go _ Nothing = debug ["no scheduled events left for", desc]
- go l (Just (NextTimeExactly t)) = runafter l t Nothing run
+ go l (Just (NextTimeExactly t)) = waitrun l t Nothing
go l (Just (NextTimeWindow windowstart windowend)) =
- runafter l windowstart (Just windowend) run
+ waitrun l windowstart (Just windowend)
desc = fromScheduledActivity activity
schedule = getSchedule activity
- runafter l t mmaxt a = do
+ waitrun l t mmaxt = do
seconds <- liftIO $ secondsUntilLocalTime t
when (seconds > Seconds 0) $ do
debug ["waiting", show seconds, "for next scheduled", desc]
@@ -109,7 +127,7 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt
then do
debug ["too late to run scheduled", desc]
go l =<< getnexttime l
- else a nowt
+ else run nowt
where
tolate nowt tz = case mmaxt of
Just maxt -> nowt > maxt
@@ -118,12 +136,31 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt
(localTimeToUTC tz nowt)
(localTimeToUTC tz t) > 600
run nowt = do
- debug ["starting", desc]
- runActivity urlrenderer activity
- debug ["finished", desc]
- liftAnnex $ setLastRunTime activity nowt
+ runActivity urlrenderer activity nowt
go (Just nowt) =<< getnexttime (Just nowt)
+{- Wait for the remote to become available by waiting on the MVar.
+ - Then check if the time is within a time window when activity
+ - is scheduled to run, and if so run it.
+ - Otherwise, just wait again on the MVar.
+ -}
+remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
+remoteActivityThread urlrenderer mvar activity lasttime = do
+ liftIO $ takeMVar mvar
+ go =<< liftIO (nextTime (getSchedule activity) lasttime)
+ where
+ go (Just (NextTimeWindow windowstart windowend)) = do
+ now <- liftIO getCurrentTime
+ tz <- liftIO $ getTimeZone now
+ if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend
+ then do
+ let nowt = utcToLocalTime tz now
+ runActivity urlrenderer activity nowt
+ loop (Just nowt)
+ else loop lasttime
+ go _ = noop -- running at exact time not handled here
+ loop = remoteActivityThread urlrenderer mvar activity
+
secondsUntilLocalTime :: LocalTime -> IO Seconds
secondsUntilLocalTime t = do
now <- getCurrentTime
@@ -133,15 +170,24 @@ secondsUntilLocalTime t = do
then Seconds secs
else Seconds 0
-runActivity :: UrlRenderer -> ScheduledActivity -> Assistant ()
-runActivity urlrenderer (ScheduledSelfFsck _ d) = do
+runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant ()
+runActivity urlrenderer activity nowt = do
+ debug ["starting", desc]
+ runActivity' urlrenderer activity
+ debug ["finished", desc]
+ liftAnnex $ setLastRunTime activity nowt
+ where
+ desc = fromScheduledActivity activity
+
+runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant ()
+runActivity' urlrenderer (ScheduledSelfFsck _ d) = do
program <- liftIO $ readProgramFile
void $ runFsck urlrenderer Nothing $
batchCommand program (Param "fsck" : fsckParams d)
mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir)
where
reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download
-runActivity urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u)
+runActivity' urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u)
where
go (Just r) = void $ case Remote.remoteFsck r of
Nothing -> void $ runFsck urlrenderer (Just $ Remote.name r) $ do
diff --git a/Assistant/Types/DaemonStatus.hs b/Assistant/Types/DaemonStatus.hs
index afb5f940a..abb857a36 100644
--- a/Assistant/Types/DaemonStatus.hs
+++ b/Assistant/Types/DaemonStatus.hs
@@ -18,6 +18,7 @@ import Assistant.Types.NetMessager
import Assistant.Types.Alert
import Control.Concurrent.STM
+import Control.Concurrent.MVar
import Control.Concurrent.Async
import Data.Time.Clock.POSIX
import qualified Data.Map as M
@@ -69,6 +70,8 @@ data DaemonStatus = DaemonStatus
-- When the XMPP client is connected, this will contain the XMPP
-- address.
, xmppClientID :: Maybe ClientID
+ -- MVars to signal when a remote gets connected.
+ , connectRemoteNotifiers :: M.Map UUID [MVar ()]
}
type TransferMap = M.Map Transfer TransferInfo
@@ -100,3 +103,4 @@ newDaemonStatus = DaemonStatus
<*> newNotificationBroadcaster
<*> newNotificationBroadcaster
<*> pure Nothing
+ <*> pure M.empty
diff --git a/Types/ScheduledActivity.hs b/Types/ScheduledActivity.hs
index 386f42333..b683409ce 100644
--- a/Types/ScheduledActivity.hs
+++ b/Types/ScheduledActivity.hs
@@ -19,6 +19,12 @@ data ScheduledActivity
| ScheduledRemoteFsck UUID Schedule Duration
deriving (Eq, Read, Show, Ord)
+{- Activities that run on a remote, within a time window, so
+ - should be run when the remote gets connected. -}
+connectActivityUUID :: ScheduledActivity -> Maybe UUID
+connectActivityUUID (ScheduledRemoteFsck u (Schedule _ AnyTime) _) = Just u
+connectActivityUUID _ = Nothing
+
getSchedule :: ScheduledActivity -> Schedule
getSchedule (ScheduledSelfFsck s _) = s
getSchedule (ScheduledRemoteFsck _ s _) = s