summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs96
-rw-r--r--Assistant/Changes.hs27
-rw-r--r--Assistant/Commits.hs34
-rw-r--r--Assistant/DaemonStatus.hs41
-rw-r--r--Assistant/Pushes.hs46
-rw-r--r--Assistant/ThreadedMonad.hs31
-rw-r--r--Assistant/Threads/Committer.hs (renamed from Assistant/Committer.hs)36
-rw-r--r--Assistant/Threads/Merger.hs87
-rw-r--r--Assistant/Threads/Pusher.hs90
-rw-r--r--Assistant/Threads/SanityChecker.hs (renamed from Assistant/SanityChecker.hs)25
-rw-r--r--Assistant/Threads/TransferWatcher.hs83
-rw-r--r--Assistant/Threads/Transferrer.hs105
-rw-r--r--Assistant/Threads/Watcher.hs (renamed from Assistant/Watcher.hs)47
-rw-r--r--Assistant/TransferQueue.hs75
-rw-r--r--Assistant/TransferSlots.hs40
-rw-r--r--Command/Assistant.hs18
-rw-r--r--Command/Status.hs4
-rw-r--r--Command/Sync.hs43
-rw-r--r--Command/Watch.hs18
-rw-r--r--GitAnnex.hs2
-rw-r--r--Logs/Transfer.hs53
-rw-r--r--Utility/DirWatcher.hs36
-rw-r--r--Utility/Parallel.hs22
-rw-r--r--Utility/TSet.hs39
-rw-r--r--Utility/Types/DirWatcher.hs3
-rw-r--r--doc/design/assistant/syncing.mdwn6
-rw-r--r--doc/git-annex.mdwn4
27 files changed, 955 insertions, 156 deletions
diff --git a/Assistant.hs b/Assistant.hs
index e924d9477..91ebf2d2e 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -21,15 +21,35 @@
- until this is complete.
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- - index, then commits.
- - Thread 6: status logger
+ - index, then commits. Also queues Transfer events to send added
+ - files to other remotes.
+ - Thread 6: pusher
+ - Waits for commits to be made, and pushes updated branches to remotes,
+ - in parallel. (Forks a process for each git push.)
+ - Thread 7: push retryer
+ - Runs every 30 minutes when there are failed pushes, and retries
+ - them.
+ - Thread 8: merger
+ - Waits for pushes to be received from remotes, and merges the
+ - updated branches into the current branch.
+ - (This uses inotify on .git/refs/heads, so there are additional
+ - inotify threads associated with it, too.)
+ - Thread 9: transfer watcher
+ - Watches for transfer information files being created and removed,
+ - and maintains the DaemonStatus currentTransfers map and the
+ - TransferSlots QSemN.
+ - (This uses inotify on .git/annex/transfer/, so there are
+ - additional inotify threads associated with it, too.)
+ - Thread 10: transferrer
+ - Waits for Transfers to be queued and does them.
+ - Thread 11: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 7: sanity checker
+ - Thread 12: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
- The Annex state is stored here, which allows resuscitating the
- - Annex monad in IO actions run by the inotify and committer
+ - Annex monad in IO actions run by the watcher and committer
- threads. Thus, a single state is shared amoung the threads, and
- only one at a time can access it.
- DaemonStatusHandle: (MVar)
@@ -39,6 +59,20 @@
- ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer
- reads from it.
+ - CommitChan: (STM TChan)
+ - Commits are indicated by writing to this channel. The pusher reads
+ - from it.
+ - FailedPushMap (STM TMVar)
+ - Failed pushes are indicated by writing to this TMVar. The push
+ - retrier blocks until they're available.
+ - TransferQueue (STM TChan)
+ - Transfers to make are indicated by writing to this channel.
+ - TransferSlots (QSemN)
+ - Count of the number of currently available transfer slots.
+ - Updated by the transfer watcher, this allows other threads
+ - to block until a slot is available.
+ - This MVar should only be manipulated from inside the Annex monad,
+ - which ensures it's accessed only after the ThreadState MVar.
-}
module Assistant where
@@ -47,39 +81,55 @@ import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
-import Assistant.Watcher
-import Assistant.Committer
-import Assistant.SanityChecker
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Assistant.Threads.Watcher
+import Assistant.Threads.Committer
+import Assistant.Threads.Pusher
+import Assistant.Threads.Merger
+import Assistant.Threads.TransferWatcher
+import Assistant.Threads.Transferrer
+import Assistant.Threads.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
+import Utility.ThreadScheduler
import Control.Concurrent
-startDaemon :: Bool -> Annex ()
-startDaemon foreground
+startDaemon :: Bool -> Bool -> Annex ()
+startDaemon assistant foreground
| foreground = do
- showStart "watch" "."
+ showStart (if assistant then "assistant" else "watch") "."
go id
| otherwise = do
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
pidfile <- fromRepo gitAnnexPidFile
go $ Utility.Daemon.daemonize logfd (Just pidfile) False
where
- go a = withThreadState $ \st -> do
+ go daemonize = withThreadState $ \st -> do
checkCanWatch
dstatus <- startDaemonStatus
- liftIO $ a $ do
- changechan <- newChangeChan
- -- The commit thread is started early,
- -- so that the user can immediately
- -- begin adding files and having them
- -- committed, even while the startup scan
- -- is taking place.
- _ <- forkIO $ commitThread st changechan
- _ <- forkIO $ daemonStatusThread st dstatus
- _ <- forkIO $ sanityCheckerThread st dstatus changechan
- -- Does not return.
- watchThread st dstatus changechan
+ liftIO $ daemonize $ run dstatus st
+ run dstatus st = do
+ changechan <- newChangeChan
+ commitchan <- newCommitChan
+ pushmap <- newFailedPushMap
+ transferqueue <- newTransferQueue
+ transferslots <- newTransferSlots
+ mapM_ (void . forkIO)
+ [ commitThread st changechan commitchan transferqueue dstatus
+ , pushThread st dstatus commitchan pushmap
+ , pushRetryThread st pushmap
+ , mergeThread st
+ , transferWatcherThread st dstatus transferslots
+ , transfererThread st dstatus transferqueue transferslots
+ , daemonStatusThread st dstatus
+ , sanityCheckerThread st dstatus transferqueue changechan
+ , watchThread st dstatus transferqueue changechan
+ ]
+ waitForTermination
stopDaemon :: Annex ()
stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 173ba1922..eca922109 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -1,6 +1,8 @@
{- git-annex assistant change tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Changes where
@@ -8,14 +10,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
+import Utility.TSet
-import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
-type ChangeChan = TChan Change
+type ChangeChan = TSet Change
data Change
= Change
@@ -29,11 +31,8 @@ data Change
}
deriving (Show)
-runChangeChan :: STM a -> IO a
-runChangeChan = atomically
-
newChangeChan :: IO ChangeChan
-newChangeChan = atomically newTChan
+newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@@ -65,17 +64,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = runChangeChan $ do
- c <- readTChan chan
- go [c]
- where
- go l = do
- v <- tryReadTChan chan
- case v of
- Nothing -> return l
- Just c -> go (c:l)
+getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+refillChanges = putTSet
+
+{- Records a change in the channel. -}
+recordChange :: ChangeChan -> Change -> IO ()
+recordChange = putTSet1
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
new file mode 100644
index 000000000..86fd7599f
--- /dev/null
+++ b/Assistant/Commits.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant commit tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Commits where
+
+import Utility.TSet
+
+import Data.Time.Clock
+
+type CommitChan = TSet Commit
+
+data Commit = Commit UTCTime
+ deriving (Show)
+
+newCommitChan :: IO CommitChan
+newCommitChan = newTSet
+
+{- Gets all unhandled commits.
+ - Blocks until at least one commit is made. -}
+getCommits :: CommitChan -> IO [Commit]
+getCommits = getTSet
+
+{- Puts unhandled commits back into the channel.
+ - Note: Original order is not preserved. -}
+refillCommits :: CommitChan -> [Commit] -> IO ()
+refillCommits = putTSet
+
+{- Records a commit in the channel. -}
+recordCommit :: CommitChan -> Commit -> IO ()
+recordCommit = putTSet1
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index e5ba3d151..64c441cee 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -1,6 +1,8 @@
{- git-annex assistant daemon status
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.DaemonStatus where
@@ -9,12 +11,15 @@ import Common.Annex
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.TempFile
+import Logs.Transfer
+import qualified Command.Sync
import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
+import qualified Data.Map as M
data DaemonStatus = DaemonStatus
-- False when the daemon is performing its startup scan
@@ -25,9 +30,15 @@ data DaemonStatus = DaemonStatus
, sanityCheckRunning :: Bool
-- Last time the sanity checker ran
, lastSanityCheck :: Maybe POSIXTime
+ -- Currently running file content transfers
+ , currentTransfers :: TransferMap
+ -- Ordered list of remotes to talk to.
+ , knownRemotes :: [Remote]
}
deriving (Show)
+type TransferMap = M.Map Transfer TransferInfo
+
type DaemonStatusHandle = MVar DaemonStatus
newDaemonStatus :: DaemonStatus
@@ -36,24 +47,33 @@ newDaemonStatus = DaemonStatus
, lastRunning = Nothing
, sanityCheckRunning = False
, lastSanityCheck = Nothing
+ , currentTransfers = M.empty
+ , knownRemotes = []
}
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
getDaemonStatus = liftIO . readMVar
-modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
-modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a)
+modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
+modifyDaemonStatus_ handle a = liftIO $ modifyMVar_ handle (return . a)
+
+modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> Annex b
+modifyDaemonStatus handle a = liftIO $ modifyMVar handle (return . a)
{- Load any previous daemon status file, and store it in the MVar for this
- - process to use as its DaemonStatus. -}
+ - process to use as its DaemonStatus. Also gets current transfer status. -}
startDaemonStatus :: Annex DaemonStatusHandle
startDaemonStatus = do
file <- fromRepo gitAnnexDaemonStatusFile
status <- liftIO $
catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
+ transfers <- M.fromList <$> getTransfers
+ remotes <- Command.Sync.syncRemotes []
liftIO $ newMVar status
{ scanComplete = False
, sanityCheckRunning = False
+ , currentTransfers = transfers
+ , knownRemotes = remotes
}
{- This thread wakes up periodically and writes the daemon status to disk. -}
@@ -117,3 +137,18 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int
tenMinutes = 10 * 60
+
+{- Mutates the transfer map. -}
+adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex ()
+adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
+ \s -> s { currentTransfers = a (currentTransfers s) }
+
+{- Removes a transfer from the map, and returns its info. -}
+removeTransfer :: DaemonStatusHandle -> Transfer -> Annex (Maybe TransferInfo)
+removeTransfer dstatus t = modifyDaemonStatus dstatus go
+ where
+ go s =
+ let (info, ts) = M.updateLookupWithKey
+ (\_k _v -> Nothing)
+ t (currentTransfers s)
+ in (s { currentTransfers = ts }, info)
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
new file mode 100644
index 000000000..f411dda07
--- /dev/null
+++ b/Assistant/Pushes.hs
@@ -0,0 +1,46 @@
+{- git-annex assistant push tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Pushes where
+
+import Common.Annex
+
+import Control.Concurrent.STM
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- Track the most recent push failure for each remote. -}
+type PushMap = M.Map Remote UTCTime
+type FailedPushMap = TMVar PushMap
+
+{- The TMVar starts empty, and is left empty when there are no
+ - failed pushes. This way we can block until there are some failed pushes.
+ -}
+newFailedPushMap :: IO FailedPushMap
+newFailedPushMap = atomically newEmptyTMVar
+
+{- Blocks until there are failed pushes.
+ - Returns Remotes whose pushes failed a given time duration or more ago.
+ - (This may be an empty list.) -}
+getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote]
+getFailedPushesBefore v duration = do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
+ where
+ toorecent now time = now `diffUTCTime` time < duration
+
+{- Modifies the map. -}
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO ()
+changeFailedPushMap v a = atomically $
+ store . a . fromMaybe M.empty =<< tryTakeTMVar v
+ where
+ {- tryTakeTMVar empties the TMVar; refill it only if
+ - the modified map is not itself empty -}
+ store m
+ | m == M.empty = noop
+ | otherwise = putTMVar v $! m
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
index 51f579d07..16f3a9dd9 100644
--- a/Assistant/ThreadedMonad.hs
+++ b/Assistant/ThreadedMonad.hs
@@ -1,17 +1,18 @@
{- making the Annex monad available across threads
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE BangPatterns #-}
-
module Assistant.ThreadedMonad where
import Common.Annex
import qualified Annex
import Control.Concurrent
-import Control.Exception (throw)
+import Data.Tuple
+import System.Posix.Types
{- The Annex state is stored in a MVar, so that threaded actions can access
- it. -}
@@ -32,13 +33,19 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- - This serializes calls by threads. -}
+ - This serializes calls by threads; only one thread can run in Annex at a
+ - time. -}
runThreadState :: ThreadState -> Annex a -> IO a
-runThreadState mvar a = do
- startstate <- takeMVar mvar
- -- catch IO errors and rethrow after restoring the MVar
- !(r, newstate) <- catchIO (Annex.run startstate a) $ \e -> do
- putMVar mvar startstate
- throw e
- putMVar mvar newstate
- return r
+runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
+
+{- Runs an Annex action in a separate process, using a copy of the state
+ - from the MVar.
+ -
+ - It's up to the action to perform any necessary shutdown tasks in order
+ - for state to not be lost. And it's up to the caller to resynchronise
+ - with any changes the action makes to eg, the git-annex branch.
+ -}
+unsafeForkProcessThreadState :: ThreadState -> Annex a -> IO ProcessID
+unsafeForkProcessThreadState mvar a = do
+ state <- readMVar mvar
+ forkProcess $ void $ Annex.eval state a
diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs
index 63df8cafc..ff5cc9eab 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -1,14 +1,20 @@
{- git-annex assistant commit thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.Committer where
+module Assistant.Threads.Committer where
import Common.Annex
import Assistant.Changes
+import Assistant.Commits
import Assistant.ThreadedMonad
-import Assistant.Watcher
+import Assistant.Threads.Watcher
+import Assistant.TransferQueue
+import Assistant.DaemonStatus
+import Logs.Transfer
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -26,8 +32,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> IO ()
-commitThread st changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
+commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -36,10 +42,11 @@ commitThread st changechan = runEvery (Seconds 1) $ do
time <- getCurrentTime
if shouldCommit time changes
then do
- readychanges <- handleAdds st changechan changes
+ readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
+ recordCommit commitchan (Commit time)
else refillChanges changechan readychanges
else refillChanges changechan changes
@@ -93,8 +100,8 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
-handleAdds st changechan cs = returnWhen (null pendingadds) $ do
+handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
@@ -106,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
- r <- handleAdds st changechan
+ r <- handleAdds st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
@@ -117,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
| otherwise = a
add :: Change -> IO (Maybe Change)
- add change@(PendingAddChange { keySource = ks }) = do
- r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
- showStart "add" $ keyFilename ks
- handle (finishedChange change) (keyFilename ks)
- =<< Command.Add.ingest ks
- return $ maybeMaybe r
+ add change@(PendingAddChange { keySource = ks }) =
+ liftM maybeMaybe $ catchMaybeIO $
+ sanitycheck ks $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ key <- Command.Add.ingest ks
+ handle (finishedChange change) (keyFilename ks) key
add _ = return Nothing
maybeMaybe (Just j@(Just _)) = j
@@ -137,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
+ queueTransfers transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
new file mode 100644
index 000000000..c7da86a8d
--- /dev/null
+++ b/Assistant/Threads/Merger.hs
@@ -0,0 +1,87 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Merger where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Annex.Branch
+import qualified Git
+import qualified Git.Command
+import qualified Git.Merge
+import qualified Git.Branch
+import qualified Command.Sync
+import qualified Remote
+
+{- This thread watches for changes to .git/refs/heads/synced/*,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let changedbranch = Git.Ref $
+ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ when (Just changedbranch == current) $
+ void $ mergeBranch changedbranch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
+
+{- Manually pull from remotes and merge their branches. Called by the pusher
+ - when a push fails, which can happen due to a remote not having pushed
+ - changes to us. That could be because it doesn't have us as a remote, or
+ - because the assistant is not running there, or other reasons. -}
+manualPull :: Git.Ref -> [Remote] -> Annex ()
+manualPull currentbranch remotes = do
+ forM_ remotes $ \r ->
+ inRepo $ Git.Command.runBool "fetch" [Param $ Remote.name r]
+ Annex.Branch.forceUpdate
+ forM_ remotes $ \r ->
+ Command.Sync.mergeRemote r currentbranch
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
new file mode 100644
index 000000000..6d6836120
--- /dev/null
+++ b/Assistant/Threads/Pusher.hs
@@ -0,0 +1,90 @@
+{- git-annex assistant git pushing threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Pusher where
+
+import Common.Annex
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.DaemonStatus
+import Assistant.ThreadedMonad
+import Assistant.Threads.Merger
+import qualified Command.Sync
+import Utility.ThreadScheduler
+import Utility.Parallel
+
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> FailedPushMap -> IO ()
+pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ unless (null topush) $ do
+ now <- getCurrentTime
+ pushToRemotes now st pushmap topush
+ where
+ halfhour = 1800
+
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
+pushThread st daemonstatus commitchan pushmap = do
+ runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to push.
+ now <- getCurrentTime
+ if shouldPush now commits
+ then do
+ remotes <- runThreadState st $
+ knownRemotes <$> getDaemonStatus daemonstatus
+ pushToRemotes now st pushmap remotes
+ else refillCommits commitchan commits
+
+{- Decide if now is a good time to push to remotes.
+ -
+ - Current strategy: Immediately push all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -}
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
+ | not (null commits) = True
+ | otherwise = False
+
+{- Updates the local sync branch, then pushes it to all remotes, in
+ - parallel.
+ -
+ - Avoids running possibly long-duration commands in the Annex monad, so
+ - as not to block other threads. -}
+pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
+pushToRemotes now st pushmap remotes = do
+ (g, branch) <- runThreadState st $
+ (,) <$> fromRepo id <*> Command.Sync.currentBranch
+ go True branch g remotes
+ where
+ go shouldretry branch g rs = do
+ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
+ (succeeded, failed) <- inParallel (push g branch) rs
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
+ unless (null failed || not shouldretry) $
+ retry branch g failed
+
+ makemap l = M.fromList $ zip l (repeat now)
+
+ push g branch remote =
+ ifM (Command.Sync.pushBranch remote branch g)
+ ( exitSuccess, exitFailure)
+
+ retry branch g rs = do
+ runThreadState st $ manualPull branch rs
+ go False branch g rs
diff --git a/Assistant/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index e2ca9da74..c5b99863e 100644
--- a/Assistant/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -1,9 +1,11 @@
{- git-annex assistant sanity checker
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.SanityChecker (
+module Assistant.Threads.SanityChecker (
sanityCheckerThread
) where
@@ -12,26 +14,27 @@ import qualified Git.LsFiles
import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Changes
+import Assistant.TransferQueue
import Utility.ThreadScheduler
-import qualified Assistant.Watcher
+import qualified Assistant.Threads.Watcher as Watcher
import Data.Time.Clock.POSIX
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
-sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-sanityCheckerThread st status changechan = forever $ do
+sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+sanityCheckerThread st status transferqueue changechan = forever $ do
waitForNextCheck st status
runThreadState st $
- modifyDaemonStatus status $ \s -> s
+ modifyDaemonStatus_ status $ \s -> s
{ sanityCheckRunning = True }
now <- getPOSIXTime -- before check started
- catchIO (check st status changechan)
+ catchIO (check st status transferqueue changechan)
(runThreadState st . warning . show)
runThreadState st $ do
- modifyDaemonStatus status $ \s -> s
+ modifyDaemonStatus_ status $ \s -> s
{ sanityCheckRunning = False
, lastSanityCheck = Just now
}
@@ -56,8 +59,8 @@ oneDay = 24 * 60 * 60
{- It's important to stay out of the Annex monad as much as possible while
- running potentially expensive parts of this check, since remaining in it
- will block the watcher. -}
-check :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-check st status changechan = do
+check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+check st status transferqueue changechan = do
g <- runThreadState st $ do
showSideAction "Running daily check"
fromRepo id
@@ -77,5 +80,5 @@ check st status changechan = do
insanity m = runThreadState st $ warning m
addsymlink file s = do
insanity $ "found unstaged symlink: " ++ file
- Assistant.Watcher.runHandler st status changechan
- Assistant.Watcher.onAddSymlink file s
+ Watcher.runHandler st status transferqueue changechan
+ Watcher.onAddSymlink file s
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
new file mode 100644
index 000000000..5be63fce4
--- /dev/null
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -0,0 +1,83 @@
+{- git-annex assistant transfer watching thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferWatcher where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferSlots
+import Logs.Transfer
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import Annex.BranchState
+
+import Data.Map as M
+
+{- This thread watches for changes to the gitAnnexTransferDir,
+ - and updates the DaemonStatus's map of ongoing transfers. -}
+transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferSlots -> IO ()
+transferWatcherThread st dstatus transferslots = do
+ g <- runThreadState st $ fromRepo id
+ let dir = gitAnnexTransferDir g
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler st dstatus transferslots a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+
+type Handler = ThreadState -> DaemonStatusHandle -> TransferSlots -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus transferslots handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler st dstatus transferslots file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ _ _ msg _ = error msg
+
+{- Called when a new transfer information file is written. -}
+onAdd :: Handler
+onAdd st dstatus _ file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> runThreadState st $ go t =<< checkTransfer t
+ where
+ go _ Nothing = noop -- transfer already finished
+ go t (Just info) = adjustTransfers dstatus $
+ M.insertWith' merge t info
+ -- preseve shouldWait flag, which is not written to disk
+ merge new old = new { shouldWait = shouldWait old }
+
+{- Called when a transfer information file is removed.
+ -
+ - When the transfer process is a child of this process, wait on it
+ - to avoid zombies.
+ -}
+onDel :: Handler
+onDel st dstatus transferslots file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> maybe noop waitchild
+ =<< runThreadState st (removeTransfer dstatus t)
+ where
+ waitchild info
+ | shouldWait info = case transferPid info of
+ Nothing -> noop
+ Just pid -> do
+ void $ tryIO $
+ getProcessStatus True False pid
+ runThreadState st invalidateCache
+ transferComplete transferslots
+ | otherwise = noop
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
new file mode 100644
index 000000000..9d3358f54
--- /dev/null
+++ b/Assistant/Threads/Transferrer.hs
@@ -0,0 +1,105 @@
+{- git-annex assistant data transferrer thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Transferrer where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Logs.Transfer
+import Logs.Presence
+import Logs.Location
+import Annex.Content
+import qualified Remote
+
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- For now only one transfer is run at a time. -}
+maxTransfers :: Int
+maxTransfers = 1
+
+{- Dispatches transfers from the queue. -}
+transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
+transfererThread st dstatus transferqueue slots = go
+ where
+ go = do
+ (t, info) <- getNextTransfer transferqueue
+ whenM (runThreadState st $ shouldTransfer dstatus t info) $
+ runTransfer st dstatus slots t info
+ go
+
+{- Checks if the requested transfer is already running, or
+ - the file to download is already present, or the remote
+ - being uploaded to isn't known to have the file. -}
+shouldTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> Annex Bool
+shouldTransfer dstatus t info =
+ go =<< currentTransfers <$> getDaemonStatus dstatus
+ where
+ go m
+ | M.member t m = return False
+ | transferDirection t == Download =
+ not <$> inAnnex key
+ | transferDirection t == Upload =
+ {- Trust the location log to check if the
+ - remote already has the key. This avoids
+ - a roundtrip to the remote. -}
+ case transferRemote info of
+ Nothing -> return False
+ Just remote ->
+ notElem (Remote.uuid remote)
+ <$> loggedLocations key
+ | otherwise = return False
+ key = transferKey t
+
+{- A transfer is run in a separate process, with a *copy* of the Annex
+ - state. This is necessary to avoid blocking the rest of the assistant
+ - on the transfer completing, and also to allow multiple transfers to run
+ - at once.
+ -
+ - However, it means that the transfer processes are responsible
+ - for doing any necessary shutdown cleanups, and that the parent
+ - thread's cache must be invalidated once a transfer completes, as
+ - changes may have been made to the git-annex branch.
+ -}
+runTransfer :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> IO ()
+runTransfer st dstatus slots t info = case (transferRemote info, associatedFile info) of
+ (Nothing, _) -> noop
+ (_, Nothing) -> noop
+ (Just remote, Just file) -> do
+ pid <- inTransferSlot slots $
+ unsafeForkProcessThreadState st $
+ transferprocess remote file
+ now <- getCurrentTime
+ runThreadState st $ adjustTransfers dstatus $
+ M.insertWith' const t info
+ { startedTime = Just now
+ , transferPid = Just pid
+ , shouldWait = True
+ }
+ where
+ isdownload = transferDirection t == Download
+ tofrom
+ | isdownload = "from"
+ | otherwise = "to"
+ key = transferKey t
+
+ transferprocess remote file = do
+ showStart "copy" file
+ showAction $ tofrom ++ " " ++ Remote.name remote
+ ok <- transfer t (Just file) $
+ if isdownload
+ then getViaTmp key $
+ Remote.retrieveKeyFile remote key (Just file)
+ else do
+ ok <- Remote.storeKey remote key $ Just file
+ when ok $
+ Remote.logStatus remote key InfoPresent
+ return ok
+ showEndResult ok
diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs
index db58f01e8..9f0eba74e 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -7,12 +7,14 @@
{-# LANGUAGE CPP #-}
-module Assistant.Watcher where
+module Assistant.Threads.Watcher where
import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
+import Assistant.TransferQueue
+import Logs.Transfer
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Annex
@@ -27,7 +29,6 @@ import Annex.Content
import Annex.CatFile
import Git.Types
-import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@@ -46,11 +47,11 @@ needLsof = error $ unlines
, "Be warned: This can corrupt data in the annex, and make fsck complain."
]
-watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-watchThread st dstatus changechan = watchDir "." ignored hooks startup
+watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+watchThread st dstatus transferqueue changechan = void $ watchDir "." ignored hooks startup
where
startup = statupScan st dstatus
- hook a = Just $ runHandler st dstatus changechan a
+ hook a = Just $ runHandler st dstatus transferqueue changechan a
hooks = WatchHooks
{ addHook = hook onAdd
, delHook = hook onDel
@@ -66,7 +67,7 @@ statupScan st dstatus scanner = do
showAction "scanning"
r <- scanner
runThreadState st $
- modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
+ modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
-- Notice any files that were deleted before watching was started.
runThreadState st $ do
@@ -83,23 +84,22 @@ ignored = ig . takeFileName
ig ".gitattributes" = True
ig _ = False
-type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
+type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
{- Runs an action handler, inside the Annex monad, and if there was a
- change, adds it to the ChangeChan.
-
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-}
-runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler st dstatus changechan handler file filestatus = void $ do
+runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus transferqueue changechan handler file filestatus = void $ do
r <- tryIO go
case r of
Left e -> print e
Right Nothing -> noop
- Right (Just change) -> void $
- runChangeChan $ writeTChan changechan change
+ Right (Just change) -> recordChange changechan change
where
- go = runThreadState st $ handler file filestatus dstatus
+ go = runThreadState st $ handler file filestatus dstatus transferqueue
{- During initial directory scan, this will be run for any regular files
- that are already checked into git. We don't want to turn those into
@@ -120,7 +120,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do
- the add.
-}
onAdd :: Handler
-onAdd file filestatus dstatus
+onAdd file filestatus dstatus _
| maybe False isRegularFile filestatus = do
ifM (scanComplete <$> getDaemonStatus dstatus)
( go
@@ -138,12 +138,15 @@ onAdd file filestatus dstatus
- before adding it.
-}
onAddSymlink :: Handler
-onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
+onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
where
go (Just (key, _)) = do
link <- calcGitLink file key
ifM ((==) link <$> liftIO (readSymbolicLink file))
- ( ensurestaged link =<< getDaemonStatus dstatus
+ ( do
+ s <- getDaemonStatus dstatus
+ checkcontent key s
+ ensurestaged link s
, do
liftIO $ removeFile file
liftIO $ createSymbolicLink link file
@@ -185,8 +188,16 @@ onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
stageSymlink file sha
madeChange file LinkChange
+ {- When a new link appears, after the startup scan,
+ - try to get the key's content. -}
+ checkcontent key daemonstatus
+ | scanComplete daemonstatus = unlessM (inAnnex key) $
+ queueTransfers transferqueue dstatus
+ key (Just file) Download
+ | otherwise = noop
+
onDel :: Handler
-onDel file _ _dstatus = do
+onDel file _ _dstatus _ = do
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.unstageFile file)
madeChange file RmChange
@@ -199,14 +210,14 @@ onDel file _ _dstatus = do
- command to get the recursive list of files in the directory, so rm is
- just as good. -}
onDelDir :: Handler
-onDelDir dir _ _dstatus = do
+onDelDir dir _ _dstatus _ = do
Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
madeChange dir RmDirChange
{- Called when there's an error with inotify. -}
onErr :: Handler
-onErr msg _ _dstatus = do
+onErr msg _ _dstatus _ = do
warning msg
return Nothing
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
new file mode 100644
index 000000000..73e73ca0a
--- /dev/null
+++ b/Assistant/TransferQueue.hs
@@ -0,0 +1,75 @@
+{- git-annex assistant pending transfer queue
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferQueue where
+
+import Common.Annex
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Types.Remote
+import qualified Remote
+
+import Control.Concurrent.STM
+
+type TransferQueue = TChan (Transfer, TransferInfo)
+
+newTransferQueue :: IO TransferQueue
+newTransferQueue = atomically newTChan
+
+stubInfo :: AssociatedFile -> TransferInfo
+stubInfo f = TransferInfo
+ { startedTime = Nothing
+ , transferPid = Nothing
+ , transferRemote = Nothing
+ , bytesComplete = Nothing
+ , associatedFile = f
+ , shouldWait = False
+ }
+
+{- Adds pending transfers to the end of the queue for some of the known
+ - remotes. -}
+queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers q daemonstatus k f direction = do
+ rs <- knownRemotes <$> getDaemonStatus daemonstatus
+ mapM_ (\r -> queue r $ gentransfer r)
+ =<< sufficientremotes rs
+ where
+ sufficientremotes l
+ -- Queue downloads from all remotes that
+ -- have the key, with the cheapest ones first.
+ -- More expensive ones will only be tried if
+ -- downloading from a cheap one fails.
+ | direction == Download = do
+ uuids <- Remote.keyLocations k
+ return $ filter (\r -> uuid r `elem` uuids) l
+ -- TODO: Determine a smaller set of remotes that
+ -- can be uploaded to, in order to ensure all
+ -- remotes can access the content. Currently,
+ -- send to every remote we can.
+ | otherwise = return l
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
+ queue r t = liftIO $ void $ atomically $ do
+ let info = (stubInfo f) { transferRemote = Just r }
+ writeTChan q (t, info)
+
+{- Adds a pending transfer to the end of the queue. -}
+queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueTransfer q f t = void $ atomically $
+ writeTChan q (t, stubInfo f)
+
+{- Adds a pending transfer to the start of the queue, to be processed next. -}
+queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueNextTransfer q f t = void $ atomically $
+ unGetTChan q (t, stubInfo f)
+
+{- Blocks until a pending transfer is available in the queue. -}
+getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
+getNextTransfer = atomically . readTChan
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
new file mode 100644
index 000000000..1859b281b
--- /dev/null
+++ b/Assistant/TransferSlots.hs
@@ -0,0 +1,40 @@
+{- git-annex assistant transfer slots
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferSlots where
+
+import Control.Exception
+import Control.Concurrent
+
+type TransferSlots = QSemN
+
+{- Number of concurrent transfers allowed to be run from the assistant.
+ -
+ - Transfers launched by other means, including by remote assistants,
+ - do not currently take up slots.
+ -}
+numSlots :: Int
+numSlots = 1
+
+newTransferSlots :: IO TransferSlots
+newTransferSlots = newQSemN numSlots
+
+{- Waits until a transfer slot becomes available, and runs a transfer
+ - action in the slot. If the action throws an exception, its slot is
+ - freed here, otherwise it should be freed by the TransferWatcher when
+ - the transfer is complete.
+ -}
+inTransferSlot :: TransferSlots -> IO a -> IO a
+inTransferSlot s a = bracketOnError start abort run
+ where
+ start = waitQSemN s 1
+ abort = const $ transferComplete s
+ run = const a
+
+{- Call when a transfer is complete. -}
+transferComplete :: TransferSlots -> IO ()
+transferComplete s = signalQSemN s 1
diff --git a/Command/Assistant.hs b/Command/Assistant.hs
new file mode 100644
index 000000000..60eac5d21
--- /dev/null
+++ b/Command/Assistant.hs
@@ -0,0 +1,18 @@
+{- git-annex assistant
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Command.Assistant where
+
+import Command
+import qualified Command.Watch
+
+def :: [Command]
+def = [withOptions [Command.Watch.foregroundOption, Command.Watch.stopOption] $
+ command "assistant" paramNothing seek "automatically handle changes"]
+
+seek :: [CommandSeek]
+seek = Command.Watch.mkSeek True
diff --git a/Command/Status.hs b/Command/Status.hs
index eff21bb50..2d63c525c 100644
--- a/Command/Status.hs
+++ b/Command/Status.hs
@@ -186,8 +186,8 @@ transfer_list = stat "transfers in progress" $ nojson $ lift $ do
[ show (transferDirection t) ++ "ing"
, fromMaybe (show $ transferKey t) (associatedFile i)
, if transferDirection t == Upload then "to" else "from"
- , maybe (fromUUID $ transferRemote t) Remote.name $
- M.lookup (transferRemote t) uuidmap
+ , maybe (fromUUID $ transferUUID t) Remote.name $
+ M.lookup (transferUUID t) uuidmap
]
disk_size :: Stat
diff --git a/Command/Sync.hs b/Command/Sync.hs
index bdb5d47a7..dfaed5949 100644
--- a/Command/Sync.hs
+++ b/Command/Sync.hs
@@ -39,7 +39,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote))
-- syncing involves several operations, any of which can independently fail
seek :: CommandSeek
seek rs = do
- !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ branch <- currentBranch
remotes <- syncRemotes rs
return $ concat
[ [ commit ]
@@ -49,6 +49,11 @@ seek rs = do
, [ pushLocal branch ]
, [ pushRemote remote branch | remote <- remotes ]
]
+
+currentBranch :: Annex Git.Ref
+currentBranch = do
+ !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ return branch
where
nobranch = error "no branch is checked out"
@@ -98,7 +103,7 @@ mergeLocal branch = go =<< needmerge
syncbranch = syncBranch branch
needmerge = do
unlessM (inRepo $ Git.Ref.exists syncbranch) $
- updateBranch syncbranch
+ inRepo $ updateBranch syncbranch
inRepo $ Git.Branch.changed branch syncbranch
go False = stop
go True = do
@@ -107,17 +112,17 @@ mergeLocal branch = go =<< needmerge
pushLocal :: Git.Ref -> CommandStart
pushLocal branch = do
- updateBranch $ syncBranch branch
+ inRepo $ updateBranch $ syncBranch branch
stop
-updateBranch :: Git.Ref -> Annex ()
-updateBranch syncbranch =
+updateBranch :: Git.Ref -> Git.Repo -> IO ()
+updateBranch syncbranch g =
unlessM go $ error $ "failed to update " ++ show syncbranch
where
- go = inRepo $ Git.Command.runBool "branch"
+ go = Git.Command.runBool "branch"
[ Param "-f"
, Param $ show $ Git.Ref.base syncbranch
- ]
+ ] g
pullRemote :: Remote -> Git.Ref -> CommandStart
pullRemote remote branch = do
@@ -143,19 +148,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge)
pushRemote :: Remote -> Git.Ref -> CommandStart
pushRemote remote branch = go =<< needpush
where
- needpush = anyM (newer remote) [syncbranch, Annex.Branch.name]
+ needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name]
go False = stop
go True = do
showStart "push" (Remote.name remote)
next $ next $ do
showOutput
- inRepo $ Git.Command.runBool "push"
- [ Param (Remote.name remote)
- , Param (show Annex.Branch.name)
- , Param refspec
- ]
- refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch)
- syncbranch = syncBranch branch
+ inRepo $ pushBranch remote branch
+
+pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool
+pushBranch remote branch g =
+ Git.Command.runBool "push"
+ [ Param (Remote.name remote)
+ , Param (show Annex.Branch.name)
+ , Param refspec
+ ] g
+ where
+ refspec = concat
+ [ show $ Git.Ref.base branch
+ , ":"
+ , show $ Git.Ref.base $ syncBranch branch
+ ]
mergeAnnex :: CommandStart
mergeAnnex = do
diff --git a/Command/Watch.hs b/Command/Watch.hs
index 5681b3861..744844c4d 100644
--- a/Command/Watch.hs
+++ b/Command/Watch.hs
@@ -1,6 +1,3 @@
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE BangPatterns #-}
-
{- git-annex watch command
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
@@ -19,10 +16,13 @@ def :: [Command]
def = [withOptions [foregroundOption, stopOption] $
command "watch" paramNothing seek "watch for changes"]
-seek :: [CommandSeek]
-seek = [withFlag stopOption $ \stopdaemon ->
+mkSeek :: Bool -> [CommandSeek]
+mkSeek assistant = [withFlag stopOption $ \stopdaemon ->
withFlag foregroundOption $ \foreground ->
- withNothing $ start foreground stopdaemon]
+ withNothing $ start assistant foreground stopdaemon]
+
+seek :: [CommandSeek]
+seek = mkSeek False
foregroundOption :: Option
foregroundOption = Option.flag [] "foreground" "do not daemonize"
@@ -30,9 +30,9 @@ foregroundOption = Option.flag [] "foreground" "do not daemonize"
stopOption :: Option
stopOption = Option.flag [] "stop" "stop daemon"
-start :: Bool -> Bool -> CommandStart
-start foreground stopdaemon = notBareRepo $ do
+start :: Bool -> Bool -> Bool -> CommandStart
+start assistant foreground stopdaemon = notBareRepo $ do
if stopdaemon
then stopDaemon
- else startDaemon foreground -- does not return
+ else startDaemon assistant foreground -- does not return
stop
diff --git a/GitAnnex.hs b/GitAnnex.hs
index bf1f27bfd..7b1fa5986 100644
--- a/GitAnnex.hs
+++ b/GitAnnex.hs
@@ -62,6 +62,7 @@ import qualified Command.Upgrade
import qualified Command.Version
#ifdef WITH_ASSISTANT
import qualified Command.Watch
+import qualified Command.Assistant
#endif
cmds :: [Command]
@@ -106,6 +107,7 @@ cmds = concat
, Command.Version.def
#ifdef WITH_ASSISTANT
, Command.Watch.def
+ , Command.Assistant.def
#endif
]
diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs
index f808cb6a4..8b8804127 100644
--- a/Logs/Transfer.hs
+++ b/Logs/Transfer.hs
@@ -1,4 +1,4 @@
-{- git-annex transfer information files
+{- git-annex transfer information files and lock files
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
@@ -14,7 +14,6 @@ import qualified Git
import Types.Remote
import qualified Fields
-import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock
@@ -22,7 +21,7 @@ import Data.Time.Clock
- of the transfer information file. -}
data Transfer = Transfer
{ transferDirection :: Direction
- , transferRemote :: UUID
+ , transferUUID :: UUID
, transferKey :: Key
}
deriving (Show, Eq, Ord)
@@ -36,9 +35,10 @@ data Transfer = Transfer
data TransferInfo = TransferInfo
{ startedTime :: Maybe UTCTime
, transferPid :: Maybe ProcessID
- , transferThread :: Maybe ThreadId
+ , transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer
, associatedFile :: Maybe FilePath
+ , shouldWait :: Bool
}
deriving (Show, Eq, Ord)
@@ -66,9 +66,9 @@ fieldTransfer direction key a = do
maybe a (\u -> transfer (Transfer direction (toUUID u) key) afile a)
=<< Fields.getField Fields.remoteUUID
-{- Runs a transfer action. Creates and locks the transfer information file
- - while the action is running. Will throw an error if the transfer is
- - already in progress.
+{- Runs a transfer action. Creates and locks the lock file while the
+ - action is running, and stores into in the transfer information
+ - file. Will throw an error if the transfer is already in progress.
-}
transfer :: Transfer -> Maybe FilePath -> Annex a -> Annex a
transfer t file a = do
@@ -78,25 +78,25 @@ transfer t file a = do
info <- liftIO $ TransferInfo
<$> (Just <$> getCurrentTime)
<*> pure Nothing -- pid not stored in file, so omitted for speed
- <*> pure Nothing -- threadid not stored in file, so omitted for speed
<*> pure Nothing -- not 0; transfer may be resuming
+ <*> pure Nothing
<*> pure file
+ <*> pure False
bracketIO (prep tfile mode info) (cleanup tfile) a
where
prep tfile mode info = do
- fd <- openFd tfile ReadWrite (Just mode)
+ fd <- openFd (transferLockFile tfile) ReadWrite (Just mode)
defaultFileFlags { trunc = True }
locked <- catchMaybeIO $
setLock fd (WriteLock, AbsoluteSeek, 0, 0)
when (locked == Nothing) $
error $ "transfer already in progress"
- h <- fdToHandle fd
- hPutStr h $ writeTransferInfo info
- hFlush h
- return h
- cleanup tfile h = do
+ writeFile tfile $ writeTransferInfo info
+ return fd
+ cleanup tfile fd = do
removeFile tfile
- hClose h
+ removeFile $ transferLockFile tfile
+ closeFd fd
{- If a transfer is still running, returns its TransferInfo. -}
checkTransfer :: Transfer -> Annex (Maybe TransferInfo)
@@ -104,22 +104,19 @@ checkTransfer t = do
mode <- annexFileMode
tfile <- fromRepo $ transferFile t
mfd <- liftIO $ catchMaybeIO $
- openFd tfile ReadOnly (Just mode) defaultFileFlags
+ openFd (transferLockFile tfile) ReadOnly (Just mode) defaultFileFlags
case mfd of
Nothing -> return Nothing -- failed to open file; not running
Just fd -> do
locked <- liftIO $
getLock fd (WriteLock, AbsoluteSeek, 0, 0)
+ liftIO $ closeFd fd
case locked of
- Nothing -> do
- liftIO $ closeFd fd
- return Nothing
- Just (pid, _) -> liftIO $ do
- h <- fdToHandle fd
- info <- readTransferInfo pid
- <$> hGetContentsStrict h
- hClose h
- return info
+ Nothing -> return Nothing
+ Just (pid, _) -> liftIO $
+ flip catchDefaultIO Nothing $
+ readTransferInfo pid
+ <$> readFile tfile
{- Gets all currently running transfers. -}
getTransfers :: Annex [(Transfer, TransferInfo)]
@@ -140,6 +137,10 @@ transferFile (Transfer direction u key) r = gitAnnexTransferDir r
</> fromUUID u
</> keyFile key
+{- The transfer lock file corresponding to a given transfer info file. -}
+transferLockFile :: FilePath -> FilePath
+transferLockFile infofile = infofile ++ ".lck"
+
{- Parses a transfer information filename to a Transfer. -}
parseTransferFile :: FilePath -> Maybe Transfer
parseTransferFile file =
@@ -156,7 +157,6 @@ writeTransferInfo :: TransferInfo -> String
writeTransferInfo info = unlines
-- transferPid is not included; instead obtained by looking at
-- the process that locks the file.
- -- transferThread is not included; not relevant for other processes
[ show $ startedTime info
-- bytesComplete is not included; changes too fast
, fromMaybe "" $ associatedFile info -- comes last; arbitrary content
@@ -171,6 +171,7 @@ readTransferInfo pid s =
<*> pure Nothing
<*> pure Nothing
<*> pure (if null filename then Nothing else Just filename)
+ <*> pure False
_ -> Nothing
where
(bits, filebits) = splitAt 1 $ lines s
diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs
index 11ce7baef..213aeb50a 100644
--- a/Utility/DirWatcher.hs
+++ b/Utility/DirWatcher.hs
@@ -17,10 +17,10 @@ import Utility.Types.DirWatcher
#if WITH_INOTIFY
import qualified Utility.INotify as INotify
import qualified System.INotify as INotify
-import Utility.ThreadScheduler
#endif
#if WITH_KQUEUE
import qualified Utility.Kqueue as Kqueue
+import Control.Concurrent
#endif
type Pruner = FilePath -> Bool
@@ -72,19 +72,41 @@ closingTracked = undefined
#endif
#endif
+/* Starts a watcher thread. The runStartup action is passed a scanner action
+ * to run, that will return once the initial directory scan is complete.
+ * Once runStartup returns, the watcher thread continues running,
+ * and processing events. Returns a DirWatcherHandle that can be used
+ * to shutdown later. */
#if WITH_INOTIFY
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
-watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do
+type DirWatcherHandle = INotify.INotify
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle
+watchDir dir prune hooks runstartup = do
+ i <- INotify.initINotify
runstartup $ INotify.watchDir i dir prune hooks
- waitForTermination -- Let the inotify thread run.
+ return i
#else
#if WITH_KQUEUE
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO ()
+type DirWatcherHandle = ThreadId
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO DirWatcherHandle
watchDir dir ignored hooks runstartup = do
kq <- runstartup $ Kqueue.initKqueue dir ignored
- Kqueue.runHooks kq hooks
+ forkIO $ Kqueue.runHooks kq hooks
#else
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
+type DirWatcherHandle = ()
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle
watchDir = undefined
#endif
#endif
+
+#if WITH_INOTIFY
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = INotify.killINotify
+#else
+#if WITH_KQUEUE
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = killThread
+#else
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = undefined
+#endif
+#endif
diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs
new file mode 100644
index 000000000..9df95ab2b
--- /dev/null
+++ b/Utility/Parallel.hs
@@ -0,0 +1,22 @@
+{- parallel processes
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Utility.Parallel where
+
+import Common
+
+{- Runs an action in parallel with a set of values.
+ - Returns the values partitioned into ones with which the action succeeded,
+ - and ones with which it failed. -}
+inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v])
+inParallel a l = do
+ pids <- mapM (forkProcess . a) l
+ statuses <- mapM (getProcessStatus True False) pids
+ return $ reduce $ partition (succeeded . snd) $ zip l statuses
+ where
+ succeeded v = v == Just (Exited ExitSuccess)
+ reduce (x,y) = (map fst x, map fst y)
diff --git a/Utility/TSet.hs b/Utility/TSet.hs
new file mode 100644
index 000000000..24d345477
--- /dev/null
+++ b/Utility/TSet.hs
@@ -0,0 +1,39 @@
+{- Transactional sets
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Utility.TSet where
+
+import Common
+
+import Control.Concurrent.STM
+
+type TSet = TChan
+
+runTSet :: STM a -> IO a
+runTSet = atomically
+
+newTSet :: IO (TSet a)
+newTSet = atomically newTChan
+
+{- Gets the contents of the TSet. Blocks until at least one item is
+ - present. -}
+getTSet :: TSet a -> IO [a]
+getTSet tset = runTSet $ do
+ c <- readTChan tset
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan tset
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts items into a TSet. -}
+putTSet :: TSet a -> [a] -> IO ()
+putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs
+
+{- Put a single item into a TSet. -}
+putTSet1 :: TSet a -> a -> IO ()
+putTSet1 tset v = void $ runTSet $ writeTChan tset v
diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs
index c828a0593..ba7eae6a1 100644
--- a/Utility/Types/DirWatcher.hs
+++ b/Utility/Types/DirWatcher.hs
@@ -20,3 +20,6 @@ data WatchHooks = WatchHooks
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}
+
+mkWatchHooks :: WatchHooks
+mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing
diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn
index a9f59f496..66502ec85 100644
--- a/doc/design/assistant/syncing.mdwn
+++ b/doc/design/assistant/syncing.mdwn
@@ -35,6 +35,12 @@ all the other git clones, at both the git level and the key/value level.
(will need to use `GIT_SSH`, which needs to point to a command to run,
not a shell command line)
+## misc todo
+
+* --debug will show often unnecessary work being done. Optimise.
+* It would be nice if, when a USB drive is connected,
+ syncing starts automatically. Use dbus on Linux?
+
## data syncing
There are two parts to data syncing. First, map the network and second,
diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn
index c52a5f3bf..85a5a18f0 100644
--- a/doc/git-annex.mdwn
+++ b/doc/git-annex.mdwn
@@ -185,6 +185,10 @@ subdirectories).
To not daemonize, run with --foreground ; to stop a running daemon,
run with --stop
+* assistant
+
+ Like watch, but also automatically syncs changes to other remotes.
+
# REPOSITORY SETUP COMMANDS
* init [description]