aboutsummaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2017-09-20 14:37:20 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2017-09-20 15:29:13 -0400
commit3826ef1923a35ef8794f0e3beb0f94f0f40fd9be (patch)
tree0a0af9d00654786c9196f8f13c973e597cfb65fd /Assistant
parentd20933a25956a3a07247f66fe3a554761d616173 (diff)
add exporter thread to assistant
This is similar to the pusher thread, but a separate thread because git pushes can be done in parallel with exports, and updating a big export should not prevent other git pushes going out in the meantime. The exportThread only runs at most every 30 seconds, since updating an export is more expensive than pushing. This may need to be tuned. Added a separate channel for export commits; the committer records a commit in that channel. Also, reconnectRemotes records a dummy commit, to make the exporter thread wake up and make sure all exports are up-to-date. So, connecting a drive with a directory special remote export will immediately update it, and getting online will automatically update S3 and WebDAV exports. The transfer queue is not involved in exports. Instead, failed exports are retried much like failed pushes. This commit was sponsored by Ewen McNeill.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Commits.hs9
-rw-r--r--Assistant/Monad.hs4
-rw-r--r--Assistant/Pushes.hs21
-rw-r--r--Assistant/Sync.hs14
-rw-r--r--Assistant/Threads/Committer.hs1
-rw-r--r--Assistant/Threads/Exporter.hs78
-rw-r--r--Assistant/Threads/Pusher.hs1
-rw-r--r--Assistant/Threads/TransferScanner.hs5
8 files changed, 114 insertions, 19 deletions
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
index c82f8f4c7..255648c94 100644
--- a/Assistant/Commits.hs
+++ b/Assistant/Commits.hs
@@ -21,3 +21,12 @@ getCommits = (atomically . getTList) <<~ commitChan
{- Records a commit in the channel. -}
recordCommit :: Assistant ()
recordCommit = (atomically . flip consTList Commit) <<~ commitChan
+
+{- Gets all unhandled export commits.
+ - Blocks until at least one export commit is made. -}
+getExportCommits :: Assistant [Commit]
+getExportCommits = (atomically . getTList) <<~ exportCommitChan
+
+{- Records an export commit in the channel. -}
+recordExportCommit :: Assistant ()
+recordExportCommit = (atomically . flip consTList Commit) <<~ exportCommitChan
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index e52983915..403ee16a8 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -62,7 +62,9 @@ data AssistantData = AssistantData
, transferSlots :: TransferSlots
, transferrerPool :: TransferrerPool
, failedPushMap :: FailedPushMap
+ , failedExportMap :: FailedPushMap
, commitChan :: CommitChan
+ , exportCommitChan :: CommitChan
, changePool :: ChangePool
, repoProblemChan :: RepoProblemChan
, branchChangeHandle :: BranchChangeHandle
@@ -80,6 +82,8 @@ newAssistantData st dstatus = AssistantData
<*> newTransferSlots
<*> newTransferrerPool (checkNetworkConnections dstatus)
<*> newFailedPushMap
+ <*> newFailedPushMap
+ <*> newCommitChan
<*> newCommitChan
<*> newChangePool
<*> newRepoProblemChan
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
index 7b4de450f..61891ea28 100644
--- a/Assistant/Pushes.hs
+++ b/Assistant/Pushes.hs
@@ -17,24 +17,21 @@ import qualified Data.Map as M
{- Blocks until there are failed pushes.
- Returns Remotes whose pushes failed a given time duration or more ago.
- (This may be an empty list.) -}
-getFailedPushesBefore :: NominalDiffTime -> Assistant [Remote]
-getFailedPushesBefore duration = do
- v <- getAssistant failedPushMap
- liftIO $ do
- m <- atomically $ readTMVar v
- now <- getCurrentTime
- return $ M.keys $ M.filter (not . toorecent now) m
+getFailedPushesBefore :: NominalDiffTime -> FailedPushMap -> Assistant [Remote]
+getFailedPushesBefore duration v = liftIO $ do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
where
toorecent now time = now `diffUTCTime` time < duration
{- Modifies the map. -}
-changeFailedPushMap :: (PushMap -> PushMap) -> Assistant ()
-changeFailedPushMap a = do
- v <- getAssistant failedPushMap
- liftIO $ atomically $ store v . a . fromMaybe M.empty =<< tryTakeTMVar v
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> Assistant ()
+changeFailedPushMap v f = liftIO $ atomically $
+ store . f . fromMaybe M.empty =<< tryTakeTMVar v
where
{- tryTakeTMVar empties the TMVar; refill it only if
- the modified map is not itself empty -}
- store v m
+ store m
| m == M.empty = noop
| otherwise = putTMVar v $! m
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index aba90f64c..c6460e9ed 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -33,6 +33,7 @@ import Assistant.Threads.Watcher (watchThread, WatcherControl(..))
import Assistant.TransferSlots
import Assistant.TransferQueue
import Assistant.RepoProblem
+import Assistant.Commits
import Types.Transfer
import Data.Time.Clock
@@ -48,10 +49,10 @@ import Control.Concurrent
- it's sufficient to requeue failed transfers.
-
- Also handles signaling any connectRemoteNotifiers, after the syncing is
- - done.
+ - done, and records an export commit to make any exports be updated.
-}
reconnectRemotes :: [Remote] -> Assistant ()
-reconnectRemotes [] = noop
+reconnectRemotes [] = recordExportCommit
reconnectRemotes rs = void $ do
rs' <- liftIO $ filterM (Remote.checkAvailable True) rs
unless (null rs') $ do
@@ -60,6 +61,7 @@ reconnectRemotes rs = void $ do
whenM (liftIO $ Remote.checkAvailable False r) $
repoHasProblem (Remote.uuid r) (syncRemote r)
mapM_ signal $ filter (`notElem` failedrs) rs'
+ recordExportCommit
where
gitremotes = filter (notspecialremote . Remote.repo) rs
(_xmppremotes, nonxmppremotes) = partition Remote.isXMPPRemote rs
@@ -143,9 +145,11 @@ pushToRemotes' now remotes = do
then retry currbranch g u failed
else fallback branch g u failed
- updatemap succeeded failed = changeFailedPushMap $ \m ->
- M.union (makemap failed) $
- M.difference m (makemap succeeded)
+ updatemap succeeded failed = do
+ v <- getAssistant failedPushMap
+ changeFailedPushMap v $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
makemap l = M.fromList $ zip l (repeat now)
retry currbranch g u rs = do
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index 3680349be..aa57d26a8 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -67,6 +67,7 @@ commitThread = namedThread "Committer" $ do
void $ alertWhile commitAlert $
liftAnnex $ commitStaged msg
recordCommit
+ recordExportCommit
let numchanges = length readychanges
mapM_ checkChangeContent readychanges
return numchanges
diff --git a/Assistant/Threads/Exporter.hs b/Assistant/Threads/Exporter.hs
new file mode 100644
index 000000000..747e919da
--- /dev/null
+++ b/Assistant/Threads/Exporter.hs
@@ -0,0 +1,78 @@
+{- git-annex assistant export updating thread
+ -
+ - Copyright 2017 Joey Hess <id@joeyh.name>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Exporter where
+
+import Assistant.Common
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.DaemonStatus
+import Annex.Concurrent
+import Utility.ThreadScheduler
+import qualified Annex
+import qualified Remote
+import qualified Types.Remote as Remote
+import qualified Command.Sync
+
+import Control.Concurrent.Async
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- This thread retries exports that failed before. -}
+exportRetryThread :: NamedThread
+exportRetryThread = namedThread "ExportRetrier" $ runEvery (Seconds halfhour) <~> do
+ -- We already waited half an hour, now wait until there are failed
+ -- exports to retry.
+ toexport <- getFailedPushesBefore (fromIntegral halfhour)
+ =<< getAssistant failedExportMap
+ unless (null toexport) $ do
+ debug ["retrying", show (length toexport), "failed exports"]
+ void $ exportToRemotes toexport
+ where
+ halfhour = 1800
+
+{- This thread updates exports soon after git commits are made. -}
+exportThread :: NamedThread
+exportThread = namedThread "Exporter" $ runEvery (Seconds 30) <~> do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ void getExportCommits
+ -- Now see if now's a good time to push.
+ void $ exportToRemotes =<< exportTargets
+
+{- We want to avoid exporting to remotes that are marked readonly.
+ -
+ - Also, avoid exporting to local remotes we can easily tell are not available,
+ - to avoid ugly messages when a removable drive is not attached.
+ -}
+exportTargets :: Assistant [Remote]
+exportTargets = liftIO . filterM (Remote.checkAvailable True)
+ =<< candidates <$> getDaemonStatus
+ where
+ candidates = filter (not . Remote.readonly) . exportRemotes
+
+exportToRemotes :: [Remote] -> Assistant ()
+exportToRemotes rs = do
+ -- This is a long-duration action which runs in the Annex monad,
+ -- so don't just liftAnnex to run it; fork the Annex state.
+ runner <- liftAnnex $ forkState $
+ forM rs $ \r -> do
+ Annex.changeState $ \st -> st { Annex.errcounter = 0 }
+ start <- liftIO getCurrentTime
+ void $ Command.Sync.seekExportContent rs
+ -- Look at command error counter to see if the export
+ -- didn't work.
+ failed <- (> 0) <$> Annex.getState Annex.errcounter
+ Annex.changeState $ \st -> st { Annex.errcounter = 0 }
+ return $ if failed
+ then Just (r, start)
+ else Nothing
+ failed <- catMaybes
+ <$> (liftAnnex =<< liftIO . wait =<< liftIO (async runner))
+ unless (null failed) $ do
+ v <- getAssistant failedExportMap
+ changeFailedPushMap v $ M.union $ M.fromList failed
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 5b4055885..82bd39d89 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -22,6 +22,7 @@ pushRetryThread = namedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore (fromIntegral halfhour)
+ =<< getAssistant failedPushMap
unless (null topush) $ do
debug ["retrying", show (length topush), "failed pushes"]
void $ pushToRemotes topush
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index fd77b88d2..e76c0b4fb 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -59,8 +59,9 @@ transferScannerThread urlrenderer = namedThread "TransferScanner" $ do
(s { transferScanRunning = b }, s)
liftIO $ sendNotification $ transferNotifier ds
- {- All git remotes are synced, and all available remotes
- - are scanned in full on startup, for multiple reasons, including:
+ {- All git remotes are synced, all exports are updated,
+ - and all available remotes are scanned in full on startup,
+ - for multiple reasons, including:
-
- * This may be the first run, and there may be remotes
- already in place, that need to be synced.