summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-09-17 21:05:50 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-09-17 21:05:50 -0400
commit3c22977e44b852ecc4d1ad2d728e5dc9071952ae (patch)
treeef2f0d8e1635c49dea2bf4d4876e25a46981f058
parent7a86dc944306af4d0a707631b03ef93941ecc1be (diff)
deferred downloads
Now when a download is queued and there's no known remote to get it from, it's added to a deferred download list, which will be retried later. The Merger thread tries to queue any deferred downloads when it receives a push to the git-annex branch. Note that the Merger thread now also forces an update of the git-annex branch. The assistant was not updating this branch before, and it saw a (mostly) correct view of state, but now that incoming pushes go to synced/git-annex, it needs to be merged in.
-rw-r--r--Assistant.hs2
-rw-r--r--Assistant/Threads/Merger.hs35
-rw-r--r--Assistant/TransferQueue.hs70
-rw-r--r--doc/design/assistant/syncing.mdwn6
4 files changed, 76 insertions, 37 deletions
diff --git a/Assistant.hs b/Assistant.hs
index d71dba5fe..b0a00ebe8 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -185,7 +185,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
#endif
, assist $ pushThread st dstatus commitchan pushmap
, assist $ pushRetryThread st dstatus pushmap
- , assist $ mergeThread st
+ , assist $ mergeThread st dstatus transferqueue
, assist $ transferWatcherThread st dstatus
, assist $ transferPollerThread st dstatus
, assist $ transfererThread st dstatus transferqueue transferslots
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
index ce1ff1c7b..c2aa1f52d 100644
--- a/Assistant/Threads/Merger.hs
+++ b/Assistant/Threads/Merger.hs
@@ -9,6 +9,8 @@ module Assistant.Threads.Merger where
import Assistant.Common
import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Annex.Branch
@@ -19,15 +21,14 @@ import qualified Git.Branch
thisThread :: ThreadName
thisThread = "Merger"
-{- This thread watches for changes to .git/refs/, looking for
- - incoming pushes. It merges those pushes into the currently
- - checked out branch. -}
-mergeThread :: ThreadState -> NamedThread
-mergeThread st = thread $ do
+{- This thread watches for changes to .git/refs/, and handles incoming
+ - pushes. -}
+mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread
+mergeThread st dstatus transferqueue = thread $ do
g <- runThreadState st $ fromRepo id
let dir = Git.localGitDir g </> "refs"
createDirectoryIfMissing True dir
- let hook a = Just $ runHandler g a
+ let hook a = Just $ runHandler st dstatus transferqueue g a
let hooks = mkWatchHooks
{ addHook = hook onAdd
, errHook = hook onErr
@@ -37,21 +38,21 @@ mergeThread st = thread $ do
where
thread = NamedThread thisThread
-type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
{- Runs an action handler.
-
- Exceptions are ignored, otherwise a whole thread could be crashed.
-}
-runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler g handler file filestatus = void $
+runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus transferqueue g handler file filestatus = void $
either print (const noop) =<< tryIO go
where
- go = handler g file filestatus
+ go = handler st dstatus transferqueue g file filestatus
{- Called when there's an error with inotify. -}
onErr :: Handler
-onErr _ msg _ = error msg
+onErr _ _ _ _ msg _ = error msg
{- Called when a new branch ref is written.
-
@@ -65,14 +66,16 @@ onErr _ msg _ = error msg
- ran are merged in.
-}
onAdd :: Handler
-onAdd g file _
+onAdd st dstatus transferqueue g file _
| ".lock" `isSuffixOf` file = noop
- | isAnnexBranch file = noop
- | "/synced/" `isInfixOf` file = go =<< Git.Branch.current g
+ | isAnnexBranch file = runThreadState st $
+ whenM Annex.Branch.forceUpdate $
+ queueDeferredDownloads Later transferqueue dstatus
+ | "/synced/" `isInfixOf` file = mergecurrent =<< Git.Branch.current g
| otherwise = noop
where
changedbranch = fileToBranch file
- go (Just current)
+ mergecurrent (Just current)
| equivBranches changedbranch current = do
liftIO $ debug thisThread
[ "merging"
@@ -81,7 +84,7 @@ onAdd g file _
, show current
]
void $ Git.Merge.mergeNonInteractive changedbranch g
- go _ = noop
+ mergecurrent _ = noop
equivBranches :: Git.Ref -> Git.Ref -> Bool
equivBranches x y = base x == base y
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 3d0464c73..21479d04d 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -11,6 +11,7 @@ module Assistant.TransferQueue (
newTransferQueue,
getTransferQueue,
queueTransfers,
+ queueDeferredDownloads,
queueTransfer,
queueTransferAt,
queueTransferWhenSmall,
@@ -32,6 +33,7 @@ import qualified Data.Map as M
data TransferQueue = TransferQueue
{ queuesize :: TVar Int
, queuelist :: TVar [(Transfer, TransferInfo)]
+ , deferreddownloads :: TVar [(Key, AssociatedFile)]
}
data Schedule = Next | Later
@@ -41,48 +43,78 @@ newTransferQueue :: IO TransferQueue
newTransferQueue = atomically $ TransferQueue
<$> newTVar 0
<*> newTVar []
+ <*> newTVar []
{- Reads the queue's content without blocking or changing it. -}
getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
getTransferQueue q = atomically $ readTVar $ queuelist q
stubInfo :: AssociatedFile -> Remote -> TransferInfo
-stubInfo f r = TransferInfo
- { startedTime = Nothing
- , transferPid = Nothing
- , transferTid = Nothing
- , transferRemote = Just r
- , bytesComplete = Nothing
+stubInfo f r = stubTransferInfo
+ { transferRemote = Just r
, associatedFile = f
- , transferPaused = False
}
{- Adds transfers to queue for some of the known remotes. -}
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
queueTransfers schedule q dstatus k f direction = do
- rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
- mapM_ go =<< sufficientremotes rs
+ rs <- sufficientremotes
+ =<< knownRemotes <$> liftIO (getDaemonStatus dstatus)
+ if null rs
+ then defer
+ else forM_ rs $ \r -> liftIO $
+ enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
where
sufficientremotes rs
- -- Queue downloads from all remotes that
- -- have the key, with the cheapest ones first.
- -- More expensive ones will only be tried if
- -- downloading from a cheap one fails.
+ {- Queue downloads from all remotes that
+ - have the key, with the cheapest ones first.
+ - More expensive ones will only be tried if
+ - downloading from a cheap one fails. -}
| direction == Download = do
uuids <- Remote.keyLocations k
return $ filter (\r -> uuid r `elem` uuids) rs
- -- TODO: Determine a smaller set of remotes that
- -- can be uploaded to, in order to ensure all
- -- remotes can access the content. Currently,
- -- send to every remote we can.
+ {- TODO: Determine a smaller set of remotes that
+ - can be uploaded to, in order to ensure all
+ - remotes can access the content. Currently,
+ - send to every remote we can. -}
| otherwise = return $ filter (not . Remote.readonly) rs
gentransfer r = Transfer
{ transferDirection = direction
, transferKey = k
, transferUUID = Remote.uuid r
}
- go r = liftIO $
- enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
+ defer
+ {- Defer this download, as no known remote has the key. -}
+ | direction == Download = void $ liftIO $ atomically $
+ modifyTVar' (deferreddownloads q) $
+ \l -> (k, f):l
+ | otherwise = noop
+
+{- Queues any deferred downloads that can now be accomplished, leaving
+ - any others in the list to try again later. -}
+queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex ()
+queueDeferredDownloads schedule q dstatus = do
+ rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
+ l <- liftIO $ atomically $ swapTVar (deferreddownloads q) []
+ left <- filterM (queue rs) l
+ unless (null left) $
+ liftIO $ atomically $ modifyTVar' (deferreddownloads q) $
+ \new -> new ++ left
+ where
+ queue rs (k, f) = do
+ uuids <- Remote.keyLocations k
+ let sources = filter (\r -> uuid r `elem` uuids) rs
+ unless (null sources) $
+ forM_ sources $ \r -> liftIO $
+ enqueue schedule q dstatus
+ (gentransfer r) (stubInfo f r)
+ return $ null sources
+ where
+ gentransfer r = Transfer
+ { transferDirection = Download
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
enqueue schedule q dstatus t info
diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn
index 222adae5e..b9c6016b2 100644
--- a/doc/design/assistant/syncing.mdwn
+++ b/doc/design/assistant/syncing.mdwn
@@ -39,7 +39,7 @@ all the other git clones, at both the git level and the key/value level.
Possible solution: C could record a download intent. (Similar to a failed
download, but with an unknown source.) When C next receives a git-annex
branch push, it could try to requeue downloads that it has such intents
- registered for.
+ registered for. **done**
Note that this solution won't cover use cases the other does. For example,
connect a USB drive A; B syncs files from it, and then should pass them to C.
@@ -85,6 +85,10 @@ all the other git clones, at both the git level and the key/value level.
need to use the TransferScanner, if we get and check a list of the changed
files.
* [[use multiple transfer slots|todo/Slow_transfer_for_a_lot_of_small_files.]]
+* The TransferQueue's list of deferred downloads could theoretically
+ grow without bounds in memory. Limit it to a given number of entries,
+ and fall back to some other method -- either storing deferred downloads
+ on disk, or perhaps scheduling a TransferScanner run to get back into sync.
## data syncing