summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs9
-rw-r--r--Assistant/DaemonStatus.hs8
-rw-r--r--Assistant/Threads/Committer.hs28
-rw-r--r--Assistant/Threads/Pusher.hs11
-rw-r--r--Assistant/TransferQueue.hs29
5 files changed, 58 insertions, 27 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 40f53d55e..548850e92 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -21,7 +21,8 @@
- until this is complete.
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- - index, then commits.
+ - index, then commits. Also queues Transfer events to send added
+ - files to other remotes.
- Thread 6: pusher
- Waits for commits to be made, and pushes updated branches to remotes,
- in parallel. (Forks a process for each git push.)
@@ -73,6 +74,7 @@ import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Commits
import Assistant.Pushes
+import Assistant.TransferQueue
import Assistant.Threads.Watcher
import Assistant.Threads.Committer
import Assistant.Threads.Pusher
@@ -103,9 +105,10 @@ startDaemon assistant foreground
changechan <- newChangeChan
commitchan <- newCommitChan
pushmap <- newFailedPushMap
+ transferqueue <- newTransferQueue
mapM_ (void . forkIO)
- [ commitThread st changechan commitchan
- , pushThread st commitchan pushmap
+ [ commitThread st changechan commitchan transferqueue dstatus
+ , pushThread st dstatus commitchan pushmap
, pushRetryThread st pushmap
, mergeThread st
, transferWatcherThread st dstatus
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 10161a96c..a3e909904 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -11,13 +11,14 @@ import Common.Annex
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.TempFile
+import Logs.Transfer
+import qualified Command.Sync
import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
-import Logs.Transfer
import qualified Data.Map as M
data DaemonStatus = DaemonStatus
@@ -31,6 +32,8 @@ data DaemonStatus = DaemonStatus
, lastSanityCheck :: Maybe POSIXTime
-- Currently running file content transfers
, currentTransfers :: M.Map Transfer TransferInfo
+ -- Ordered list of remotes to talk to.
+ , knownRemotes :: [Remote]
}
deriving (Show)
@@ -43,6 +46,7 @@ newDaemonStatus = DaemonStatus
, sanityCheckRunning = False
, lastSanityCheck = Nothing
, currentTransfers = M.empty
+ , knownRemotes = []
}
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
@@ -59,10 +63,12 @@ startDaemonStatus = do
status <- liftIO $
catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
transfers <- M.fromList <$> getTransfers
+ remotes <- Command.Sync.syncRemotes []
liftIO $ newMVar status
{ scanComplete = False
, sanityCheckRunning = False
, currentTransfers = transfers
+ , knownRemotes = remotes
}
{- This thread wakes up periodically and writes the daemon status to disk. -}
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index 488056fa2..ff5cc9eab 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -12,6 +12,9 @@ import Assistant.Changes
import Assistant.Commits
import Assistant.ThreadedMonad
import Assistant.Threads.Watcher
+import Assistant.TransferQueue
+import Assistant.DaemonStatus
+import Logs.Transfer
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -29,8 +32,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
-commitThread st changechan commitchan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
+commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -39,7 +42,7 @@ commitThread st changechan commitchan = runEvery (Seconds 1) $ do
time <- getCurrentTime
if shouldCommit time changes
then do
- readychanges <- handleAdds st changechan changes
+ readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
@@ -97,8 +100,8 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
-handleAdds st changechan cs = returnWhen (null pendingadds) $ do
+handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
@@ -110,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
- r <- handleAdds st changechan
+ r <- handleAdds st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
@@ -121,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
| otherwise = a
add :: Change -> IO (Maybe Change)
- add change@(PendingAddChange { keySource = ks }) = do
- r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
- showStart "add" $ keyFilename ks
- handle (finishedChange change) (keyFilename ks)
- =<< Command.Add.ingest ks
- return $ maybeMaybe r
+ add change@(PendingAddChange { keySource = ks }) =
+ liftM maybeMaybe $ catchMaybeIO $
+ sanitycheck ks $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ key <- Command.Add.ingest ks
+ handle (finishedChange change) (keyFilename ks) key
add _ = return Nothing
maybeMaybe (Just j@(Just _)) = j
@@ -141,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
+ queueTransfers transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 04d343528..6d6836120 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -10,6 +10,7 @@ module Assistant.Threads.Pusher where
import Common.Annex
import Assistant.Commits
import Assistant.Pushes
+import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Threads.Merger
import qualified Command.Sync
@@ -32,9 +33,8 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
halfhour = 1800
{- This thread pushes git commits out to remotes soon after they are made. -}
-pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO ()
-pushThread st commitchan pushmap = do
- remotes <- runThreadState st $ Command.Sync.syncRemotes []
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
+pushThread st daemonstatus commitchan pushmap = do
runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
@@ -42,7 +42,10 @@ pushThread st commitchan pushmap = do
-- Now see if now's a good time to push.
now <- getCurrentTime
if shouldPush now commits
- then pushToRemotes now st pushmap remotes
+ then do
+ remotes <- runThreadState st $
+ knownRemotes <$> getDaemonStatus daemonstatus
+ pushToRemotes now st pushmap remotes
else refillCommits commitchan commits
{- Decide if now is a good time to push to remotes.
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 979cbb80f..fc25b057d 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -8,9 +8,10 @@
module Assistant.TransferQueue where
import Common.Annex
-import Utility.TSet
+import Assistant.DaemonStatus
import Logs.Transfer
import Types.Remote
+import qualified Remote
import Control.Concurrent.STM
@@ -28,15 +29,29 @@ stubInfo f = TransferInfo
, associatedFile = f
}
+{- Adds pending transfers to the end of the queue for some of the known
+ - remotes. (TBD: a smaller set of remotes that are sufficient to transfer to,
+ - rather than transferring to all.) -}
+queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers q daemonstatus k f direction =
+ mapM_ (liftIO . queueTransfer q f . gentransfer)
+ =<< knownRemotes <$> getDaemonStatus daemonstatus
+ where
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferRemote = Remote.uuid r
+ }
+
{- Adds a pending transfer to the end of the queue. -}
-queueTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO ()
-queueTransfer q transfer f = void $ atomically $
- writeTChan q (transfer, stubInfo f)
+queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueTransfer q f t = void $ atomically $
+ writeTChan q (t, stubInfo f)
{- Adds a pending transfer to the start of the queue, to be processed next. -}
-queueNextTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO ()
-queueNextTransfer q transfer f = void $ atomically $
- unGetTChan q (transfer, stubInfo f)
+queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueNextTransfer q f t = void $ atomically $
+ unGetTChan q (t, stubInfo f)
{- Blocks until a pending transfer is available in the queue. -}
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)