summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Commits.hs9
-rw-r--r--Assistant/Monad.hs4
-rw-r--r--Assistant/Pushes.hs21
-rw-r--r--Assistant/Sync.hs14
-rw-r--r--Assistant/Threads/Committer.hs1
-rw-r--r--Assistant/Threads/Exporter.hs78
-rw-r--r--Assistant/Threads/Pusher.hs1
-rw-r--r--Assistant/Threads/TransferScanner.hs5
8 files changed, 114 insertions, 19 deletions
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
index c82f8f4c7..255648c94 100644
--- a/Assistant/Commits.hs
+++ b/Assistant/Commits.hs
@@ -21,3 +21,12 @@ getCommits = (atomically . getTList) <<~ commitChan
{- Records a commit in the channel. -}
recordCommit :: Assistant ()
recordCommit = (atomically . flip consTList Commit) <<~ commitChan
+
+{- Gets all unhandled export commits.
+ - Blocks until at least one export commit is made. -}
+getExportCommits :: Assistant [Commit]
+getExportCommits = (atomically . getTList) <<~ exportCommitChan
+
+{- Records an export commit in the channel. -}
+recordExportCommit :: Assistant ()
+recordExportCommit = (atomically . flip consTList Commit) <<~ exportCommitChan
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index e52983915..403ee16a8 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -62,7 +62,9 @@ data AssistantData = AssistantData
, transferSlots :: TransferSlots
, transferrerPool :: TransferrerPool
, failedPushMap :: FailedPushMap
+ , failedExportMap :: FailedPushMap
, commitChan :: CommitChan
+ , exportCommitChan :: CommitChan
, changePool :: ChangePool
, repoProblemChan :: RepoProblemChan
, branchChangeHandle :: BranchChangeHandle
@@ -80,6 +82,8 @@ newAssistantData st dstatus = AssistantData
<*> newTransferSlots
<*> newTransferrerPool (checkNetworkConnections dstatus)
<*> newFailedPushMap
+ <*> newFailedPushMap
+ <*> newCommitChan
<*> newCommitChan
<*> newChangePool
<*> newRepoProblemChan
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
index 7b4de450f..61891ea28 100644
--- a/Assistant/Pushes.hs
+++ b/Assistant/Pushes.hs
@@ -17,24 +17,21 @@ import qualified Data.Map as M
{- Blocks until there are failed pushes.
- Returns Remotes whose pushes failed a given time duration or more ago.
- (This may be an empty list.) -}
-getFailedPushesBefore :: NominalDiffTime -> Assistant [Remote]
-getFailedPushesBefore duration = do
- v <- getAssistant failedPushMap
- liftIO $ do
- m <- atomically $ readTMVar v
- now <- getCurrentTime
- return $ M.keys $ M.filter (not . toorecent now) m
+getFailedPushesBefore :: NominalDiffTime -> FailedPushMap -> Assistant [Remote]
+getFailedPushesBefore duration v = liftIO $ do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
where
toorecent now time = now `diffUTCTime` time < duration
{- Modifies the map. -}
-changeFailedPushMap :: (PushMap -> PushMap) -> Assistant ()
-changeFailedPushMap a = do
- v <- getAssistant failedPushMap
- liftIO $ atomically $ store v . a . fromMaybe M.empty =<< tryTakeTMVar v
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> Assistant ()
+changeFailedPushMap v f = liftIO $ atomically $
+ store . f . fromMaybe M.empty =<< tryTakeTMVar v
where
{- tryTakeTMVar empties the TMVar; refill it only if
- the modified map is not itself empty -}
- store v m
+ store m
| m == M.empty = noop
| otherwise = putTMVar v $! m
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index aba90f64c..c6460e9ed 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -33,6 +33,7 @@ import Assistant.Threads.Watcher (watchThread, WatcherControl(..))
import Assistant.TransferSlots
import Assistant.TransferQueue
import Assistant.RepoProblem
+import Assistant.Commits
import Types.Transfer
import Data.Time.Clock
@@ -48,10 +49,10 @@ import Control.Concurrent
- it's sufficient to requeue failed transfers.
-
- Also handles signaling any connectRemoteNotifiers, after the syncing is
- - done.
+ - done, and records an export commit to make any exports be updated.
-}
reconnectRemotes :: [Remote] -> Assistant ()
-reconnectRemotes [] = noop
+reconnectRemotes [] = recordExportCommit
reconnectRemotes rs = void $ do
rs' <- liftIO $ filterM (Remote.checkAvailable True) rs
unless (null rs') $ do
@@ -60,6 +61,7 @@ reconnectRemotes rs = void $ do
whenM (liftIO $ Remote.checkAvailable False r) $
repoHasProblem (Remote.uuid r) (syncRemote r)
mapM_ signal $ filter (`notElem` failedrs) rs'
+ recordExportCommit
where
gitremotes = filter (notspecialremote . Remote.repo) rs
(_xmppremotes, nonxmppremotes) = partition Remote.isXMPPRemote rs
@@ -143,9 +145,11 @@ pushToRemotes' now remotes = do
then retry currbranch g u failed
else fallback branch g u failed
- updatemap succeeded failed = changeFailedPushMap $ \m ->
- M.union (makemap failed) $
- M.difference m (makemap succeeded)
+ updatemap succeeded failed = do
+ v <- getAssistant failedPushMap
+ changeFailedPushMap v $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
makemap l = M.fromList $ zip l (repeat now)
retry currbranch g u rs = do
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index 3680349be..aa57d26a8 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -67,6 +67,7 @@ commitThread = namedThread "Committer" $ do
void $ alertWhile commitAlert $
liftAnnex $ commitStaged msg
recordCommit
+ recordExportCommit
let numchanges = length readychanges
mapM_ checkChangeContent readychanges
return numchanges
diff --git a/Assistant/Threads/Exporter.hs b/Assistant/Threads/Exporter.hs
new file mode 100644
index 000000000..747e919da
--- /dev/null
+++ b/Assistant/Threads/Exporter.hs
@@ -0,0 +1,78 @@
+{- git-annex assistant export updating thread
+ -
+ - Copyright 2017 Joey Hess <id@joeyh.name>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Exporter where
+
+import Assistant.Common
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.DaemonStatus
+import Annex.Concurrent
+import Utility.ThreadScheduler
+import qualified Annex
+import qualified Remote
+import qualified Types.Remote as Remote
+import qualified Command.Sync
+
+import Control.Concurrent.Async
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- This thread retries exports that failed before. -}
+exportRetryThread :: NamedThread
+exportRetryThread = namedThread "ExportRetrier" $ runEvery (Seconds halfhour) <~> do
+ -- We already waited half an hour, now wait until there are failed
+ -- exports to retry.
+ toexport <- getFailedPushesBefore (fromIntegral halfhour)
+ =<< getAssistant failedExportMap
+ unless (null toexport) $ do
+ debug ["retrying", show (length toexport), "failed exports"]
+ void $ exportToRemotes toexport
+ where
+ halfhour = 1800
+
+{- This thread updates exports soon after git commits are made. -}
+exportThread :: NamedThread
+exportThread = namedThread "Exporter" $ runEvery (Seconds 30) <~> do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ void getExportCommits
+ -- Now see if now's a good time to push.
+ void $ exportToRemotes =<< exportTargets
+
+{- We want to avoid exporting to remotes that are marked readonly.
+ -
+ - Also, avoid exporting to local remotes we can easily tell are not available,
+ - to avoid ugly messages when a removable drive is not attached.
+ -}
+exportTargets :: Assistant [Remote]
+exportTargets = liftIO . filterM (Remote.checkAvailable True)
+ =<< candidates <$> getDaemonStatus
+ where
+ candidates = filter (not . Remote.readonly) . exportRemotes
+
+exportToRemotes :: [Remote] -> Assistant ()
+exportToRemotes rs = do
+ -- This is a long-duration action which runs in the Annex monad,
+ -- so don't just liftAnnex to run it; fork the Annex state.
+ runner <- liftAnnex $ forkState $
+ forM rs $ \r -> do
+ Annex.changeState $ \st -> st { Annex.errcounter = 0 }
+ start <- liftIO getCurrentTime
+ void $ Command.Sync.seekExportContent rs
+ -- Look at command error counter to see if the export
+ -- didn't work.
+ failed <- (> 0) <$> Annex.getState Annex.errcounter
+ Annex.changeState $ \st -> st { Annex.errcounter = 0 }
+ return $ if failed
+ then Just (r, start)
+ else Nothing
+ failed <- catMaybes
+ <$> (liftAnnex =<< liftIO . wait =<< liftIO (async runner))
+ unless (null failed) $ do
+ v <- getAssistant failedExportMap
+ changeFailedPushMap v $ M.union $ M.fromList failed
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 5b4055885..82bd39d89 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -22,6 +22,7 @@ pushRetryThread = namedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore (fromIntegral halfhour)
+ =<< getAssistant failedPushMap
unless (null topush) $ do
debug ["retrying", show (length topush), "failed pushes"]
void $ pushToRemotes topush
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index fd77b88d2..e76c0b4fb 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -59,8 +59,9 @@ transferScannerThread urlrenderer = namedThread "TransferScanner" $ do
(s { transferScanRunning = b }, s)
liftIO $ sendNotification $ transferNotifier ds
- {- All git remotes are synced, and all available remotes
- - are scanned in full on startup, for multiple reasons, including:
+ {- All git remotes are synced, all exports are updated,
+ - and all available remotes are scanned in full on startup,
+ - for multiple reasons, including:
-
- * This may be the first run, and there may be remotes
- already in place, that need to be synced.