summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-07-05 10:21:22 -0600
committerGravatar Joey Hess <joey@kitenet.net>2012-07-05 10:21:22 -0600
commit83c66ccaf88a10e8f4b16fc2162cbed2656b95e0 (patch)
tree6b1807e02096a81c96f788d3fd4305f89ea2018e /Assistant
parentb0894f00c075e4dd93a692880e8eb0ea865b6c28 (diff)
queue Uploads of newly added files to remotes
Added knownRemotes to DaemonStatus. This list is not entirely trivial to calculate, and having it here should make it easier to add/remove remotes on the fly later on. It did require plumbing the daemonstatus through to some more threads.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/DaemonStatus.hs8
-rw-r--r--Assistant/Threads/Committer.hs28
-rw-r--r--Assistant/Threads/Pusher.hs11
-rw-r--r--Assistant/TransferQueue.hs29
4 files changed, 52 insertions, 24 deletions
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 10161a96c..a3e909904 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -11,13 +11,14 @@ import Common.Annex
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.TempFile
+import Logs.Transfer
+import qualified Command.Sync
import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
-import Logs.Transfer
import qualified Data.Map as M
data DaemonStatus = DaemonStatus
@@ -31,6 +32,8 @@ data DaemonStatus = DaemonStatus
, lastSanityCheck :: Maybe POSIXTime
-- Currently running file content transfers
, currentTransfers :: M.Map Transfer TransferInfo
+ -- Ordered list of remotes to talk to.
+ , knownRemotes :: [Remote]
}
deriving (Show)
@@ -43,6 +46,7 @@ newDaemonStatus = DaemonStatus
, sanityCheckRunning = False
, lastSanityCheck = Nothing
, currentTransfers = M.empty
+ , knownRemotes = []
}
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
@@ -59,10 +63,12 @@ startDaemonStatus = do
status <- liftIO $
catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
transfers <- M.fromList <$> getTransfers
+ remotes <- Command.Sync.syncRemotes []
liftIO $ newMVar status
{ scanComplete = False
, sanityCheckRunning = False
, currentTransfers = transfers
+ , knownRemotes = remotes
}
{- This thread wakes up periodically and writes the daemon status to disk. -}
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index 488056fa2..ff5cc9eab 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -12,6 +12,9 @@ import Assistant.Changes
import Assistant.Commits
import Assistant.ThreadedMonad
import Assistant.Threads.Watcher
+import Assistant.TransferQueue
+import Assistant.DaemonStatus
+import Logs.Transfer
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -29,8 +32,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
-commitThread st changechan commitchan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
+commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -39,7 +42,7 @@ commitThread st changechan commitchan = runEvery (Seconds 1) $ do
time <- getCurrentTime
if shouldCommit time changes
then do
- readychanges <- handleAdds st changechan changes
+ readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
@@ -97,8 +100,8 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
-handleAdds st changechan cs = returnWhen (null pendingadds) $ do
+handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
@@ -110,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
- r <- handleAdds st changechan
+ r <- handleAdds st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
@@ -121,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
| otherwise = a
add :: Change -> IO (Maybe Change)
- add change@(PendingAddChange { keySource = ks }) = do
- r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
- showStart "add" $ keyFilename ks
- handle (finishedChange change) (keyFilename ks)
- =<< Command.Add.ingest ks
- return $ maybeMaybe r
+ add change@(PendingAddChange { keySource = ks }) =
+ liftM maybeMaybe $ catchMaybeIO $
+ sanitycheck ks $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ key <- Command.Add.ingest ks
+ handle (finishedChange change) (keyFilename ks) key
add _ = return Nothing
maybeMaybe (Just j@(Just _)) = j
@@ -141,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
+ queueTransfers transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 04d343528..6d6836120 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -10,6 +10,7 @@ module Assistant.Threads.Pusher where
import Common.Annex
import Assistant.Commits
import Assistant.Pushes
+import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Threads.Merger
import qualified Command.Sync
@@ -32,9 +33,8 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
halfhour = 1800
{- This thread pushes git commits out to remotes soon after they are made. -}
-pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO ()
-pushThread st commitchan pushmap = do
- remotes <- runThreadState st $ Command.Sync.syncRemotes []
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
+pushThread st daemonstatus commitchan pushmap = do
runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
@@ -42,7 +42,10 @@ pushThread st commitchan pushmap = do
-- Now see if now's a good time to push.
now <- getCurrentTime
if shouldPush now commits
- then pushToRemotes now st pushmap remotes
+ then do
+ remotes <- runThreadState st $
+ knownRemotes <$> getDaemonStatus daemonstatus
+ pushToRemotes now st pushmap remotes
else refillCommits commitchan commits
{- Decide if now is a good time to push to remotes.
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 979cbb80f..fc25b057d 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -8,9 +8,10 @@
module Assistant.TransferQueue where
import Common.Annex
-import Utility.TSet
+import Assistant.DaemonStatus
import Logs.Transfer
import Types.Remote
+import qualified Remote
import Control.Concurrent.STM
@@ -28,15 +29,29 @@ stubInfo f = TransferInfo
, associatedFile = f
}
+{- Adds pending transfers to the end of the queue for some of the known
+ - remotes. (TBD: a smaller set of remotes that are sufficient to transfer to,
+ - rather than transferring to all.) -}
+queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers q daemonstatus k f direction =
+ mapM_ (liftIO . queueTransfer q f . gentransfer)
+ =<< knownRemotes <$> getDaemonStatus daemonstatus
+ where
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferRemote = Remote.uuid r
+ }
+
{- Adds a pending transfer to the end of the queue. -}
-queueTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO ()
-queueTransfer q transfer f = void $ atomically $
- writeTChan q (transfer, stubInfo f)
+queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueTransfer q f t = void $ atomically $
+ writeTChan q (t, stubInfo f)
{- Adds a pending transfer to the start of the queue, to be processed next. -}
-queueNextTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO ()
-queueNextTransfer q transfer f = void $ atomically $
- unGetTChan q (transfer, stubInfo f)
+queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueNextTransfer q f t = void $ atomically $
+ unGetTChan q (t, stubInfo f)
{- Blocks until a pending transfer is available in the queue. -}
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)