diff options
author | Joey Hess <joey@kitenet.net> | 2012-09-17 21:05:50 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-09-17 21:05:50 -0400 |
commit | 3c22977e44b852ecc4d1ad2d728e5dc9071952ae (patch) | |
tree | ef2f0d8e1635c49dea2bf4d4876e25a46981f058 | |
parent | 7a86dc944306af4d0a707631b03ef93941ecc1be (diff) |
deferred downloads
Now when a download is queued and there's no known remote to get it from,
it's added to a deferred download list, which will be retried later.
The Merger thread tries to queue any deferred downloads when it receives
a push to the git-annex branch.
Note that the Merger thread now also forces an update of the git-annex
branch. The assistant was not updating this branch before, and it saw a
(mostly) correct view of state, but now that incoming pushes go to
synced/git-annex, it needs to be merged in.
-rw-r--r-- | Assistant.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 35 | ||||
-rw-r--r-- | Assistant/TransferQueue.hs | 70 | ||||
-rw-r--r-- | doc/design/assistant/syncing.mdwn | 6 |
4 files changed, 76 insertions, 37 deletions
diff --git a/Assistant.hs b/Assistant.hs index d71dba5fe..b0a00ebe8 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -185,7 +185,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do #endif , assist $ pushThread st dstatus commitchan pushmap , assist $ pushRetryThread st dstatus pushmap - , assist $ mergeThread st + , assist $ mergeThread st dstatus transferqueue , assist $ transferWatcherThread st dstatus , assist $ transferPollerThread st dstatus , assist $ transfererThread st dstatus transferqueue transferslots diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index ce1ff1c7b..c2aa1f52d 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -9,6 +9,8 @@ module Assistant.Threads.Merger where import Assistant.Common import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.TransferQueue import Utility.DirWatcher import Utility.Types.DirWatcher import qualified Annex.Branch @@ -19,15 +21,14 @@ import qualified Git.Branch thisThread :: ThreadName thisThread = "Merger" -{- This thread watches for changes to .git/refs/, looking for - - incoming pushes. It merges those pushes into the currently - - checked out branch. -} -mergeThread :: ThreadState -> NamedThread -mergeThread st = thread $ do +{- This thread watches for changes to .git/refs/, and handles incoming + - pushes. -} +mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread +mergeThread st dstatus transferqueue = thread $ do g <- runThreadState st $ fromRepo id let dir = Git.localGitDir g </> "refs" createDirectoryIfMissing True dir - let hook a = Just $ runHandler g a + let hook a = Just $ runHandler st dstatus transferqueue g a let hooks = mkWatchHooks { addHook = hook onAdd , errHook = hook onErr @@ -37,21 +38,21 @@ mergeThread st = thread $ do where thread = NamedThread thisThread -type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () +type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> Git.Repo -> FilePath -> Maybe FileStatus -> IO () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler g handler file filestatus = void $ +runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus transferqueue g handler file filestatus = void $ either print (const noop) =<< tryIO go where - go = handler g file filestatus + go = handler st dstatus transferqueue g file filestatus {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ msg _ = error msg +onErr _ _ _ _ msg _ = error msg {- Called when a new branch ref is written. - @@ -65,14 +66,16 @@ onErr _ msg _ = error msg - ran are merged in. -} onAdd :: Handler -onAdd g file _ +onAdd st dstatus transferqueue g file _ | ".lock" `isSuffixOf` file = noop - | isAnnexBranch file = noop - | "/synced/" `isInfixOf` file = go =<< Git.Branch.current g + | isAnnexBranch file = runThreadState st $ + whenM Annex.Branch.forceUpdate $ + queueDeferredDownloads Later transferqueue dstatus + | "/synced/" `isInfixOf` file = mergecurrent =<< Git.Branch.current g | otherwise = noop where changedbranch = fileToBranch file - go (Just current) + mergecurrent (Just current) | equivBranches changedbranch current = do liftIO $ debug thisThread [ "merging" @@ -81,7 +84,7 @@ onAdd g file _ , show current ] void $ Git.Merge.mergeNonInteractive changedbranch g - go _ = noop + mergecurrent _ = noop equivBranches :: Git.Ref -> Git.Ref -> Bool equivBranches x y = base x == base y diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index 3d0464c73..21479d04d 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -11,6 +11,7 @@ module Assistant.TransferQueue ( newTransferQueue, getTransferQueue, queueTransfers, + queueDeferredDownloads, queueTransfer, queueTransferAt, queueTransferWhenSmall, @@ -32,6 +33,7 @@ import qualified Data.Map as M data TransferQueue = TransferQueue { queuesize :: TVar Int , queuelist :: TVar [(Transfer, TransferInfo)] + , deferreddownloads :: TVar [(Key, AssociatedFile)] } data Schedule = Next | Later @@ -41,48 +43,78 @@ newTransferQueue :: IO TransferQueue newTransferQueue = atomically $ TransferQueue <$> newTVar 0 <*> newTVar [] + <*> newTVar [] {- Reads the queue's content without blocking or changing it. -} getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)] getTransferQueue q = atomically $ readTVar $ queuelist q stubInfo :: AssociatedFile -> Remote -> TransferInfo -stubInfo f r = TransferInfo - { startedTime = Nothing - , transferPid = Nothing - , transferTid = Nothing - , transferRemote = Just r - , bytesComplete = Nothing +stubInfo f r = stubTransferInfo + { transferRemote = Just r , associatedFile = f - , transferPaused = False } {- Adds transfers to queue for some of the known remotes. -} queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () queueTransfers schedule q dstatus k f direction = do - rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus) - mapM_ go =<< sufficientremotes rs + rs <- sufficientremotes + =<< knownRemotes <$> liftIO (getDaemonStatus dstatus) + if null rs + then defer + else forM_ rs $ \r -> liftIO $ + enqueue schedule q dstatus (gentransfer r) (stubInfo f r) where sufficientremotes rs - -- Queue downloads from all remotes that - -- have the key, with the cheapest ones first. - -- More expensive ones will only be tried if - -- downloading from a cheap one fails. + {- Queue downloads from all remotes that + - have the key, with the cheapest ones first. + - More expensive ones will only be tried if + - downloading from a cheap one fails. -} | direction == Download = do uuids <- Remote.keyLocations k return $ filter (\r -> uuid r `elem` uuids) rs - -- TODO: Determine a smaller set of remotes that - -- can be uploaded to, in order to ensure all - -- remotes can access the content. Currently, - -- send to every remote we can. + {- TODO: Determine a smaller set of remotes that + - can be uploaded to, in order to ensure all + - remotes can access the content. Currently, + - send to every remote we can. -} | otherwise = return $ filter (not . Remote.readonly) rs gentransfer r = Transfer { transferDirection = direction , transferKey = k , transferUUID = Remote.uuid r } - go r = liftIO $ - enqueue schedule q dstatus (gentransfer r) (stubInfo f r) + defer + {- Defer this download, as no known remote has the key. -} + | direction == Download = void $ liftIO $ atomically $ + modifyTVar' (deferreddownloads q) $ + \l -> (k, f):l + | otherwise = noop + +{- Queues any deferred downloads that can now be accomplished, leaving + - any others in the list to try again later. -} +queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex () +queueDeferredDownloads schedule q dstatus = do + rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus) + l <- liftIO $ atomically $ swapTVar (deferreddownloads q) [] + left <- filterM (queue rs) l + unless (null left) $ + liftIO $ atomically $ modifyTVar' (deferreddownloads q) $ + \new -> new ++ left + where + queue rs (k, f) = do + uuids <- Remote.keyLocations k + let sources = filter (\r -> uuid r `elem` uuids) rs + unless (null sources) $ + forM_ sources $ \r -> liftIO $ + enqueue schedule q dstatus + (gentransfer r) (stubInfo f r) + return $ null sources + where + gentransfer r = Transfer + { transferDirection = Download + , transferKey = k + , transferUUID = Remote.uuid r + } enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO () enqueue schedule q dstatus t info diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn index 222adae5e..b9c6016b2 100644 --- a/doc/design/assistant/syncing.mdwn +++ b/doc/design/assistant/syncing.mdwn @@ -39,7 +39,7 @@ all the other git clones, at both the git level and the key/value level. Possible solution: C could record a download intent. (Similar to a failed download, but with an unknown source.) When C next receives a git-annex branch push, it could try to requeue downloads that it has such intents - registered for. + registered for. **done** Note that this solution won't cover use cases the other does. For example, connect a USB drive A; B syncs files from it, and then should pass them to C. @@ -85,6 +85,10 @@ all the other git clones, at both the git level and the key/value level. need to use the TransferScanner, if we get and check a list of the changed files. * [[use multiple transfer slots|todo/Slow_transfer_for_a_lot_of_small_files.]] +* The TransferQueue's list of deferred downloads could theoretically + grow without bounds in memory. Limit it to a given number of entries, + and fall back to some other method -- either storing deferred downloads + on disk, or perhaps scheduling a TransferScanner run to get back into sync. ## data syncing |