diff options
56 files changed, 1301 insertions, 323 deletions
diff --git a/Annex/UUID.hs b/Annex/UUID.hs index 517840fba..09862f9fc 100644 --- a/Annex/UUID.hs +++ b/Annex/UUID.hs @@ -32,8 +32,10 @@ configkey = annexConfig "uuid" {- Generates a UUID. There is a library for this, but it's not packaged, - so use the command line tool. -} genUUID :: IO UUID -genUUID = pOpen ReadFromPipe command params $ liftM toUUID . hGetLine +genUUID = gen . lines <$> readProcess command params where + gen [] = error $ "no output from " ++ command + gen (l:_) = toUUID l command = SysConfig.uuid params -- request a random uuid be generated diff --git a/Assistant.hs b/Assistant.hs index e924d9477..06484b086 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -21,15 +21,35 @@ - until this is complete. - Thread 5: committer - Waits for changes to occur, and runs the git queue to update its - - index, then commits. - - Thread 6: status logger + - index, then commits. Also queues Transfer events to send added + - files to other remotes. + - Thread 6: pusher + - Waits for commits to be made, and pushes updated branches to remotes, + - in parallel. (Forks a process for each git push.) + - Thread 7: push retryer + - Runs every 30 minutes when there are failed pushes, and retries + - them. + - Thread 8: merger + - Waits for pushes to be received from remotes, and merges the + - updated branches into the current branch. + - (This uses inotify on .git/refs/heads, so there are additional + - inotify threads associated with it, too.) + - Thread 9: transfer watcher + - Watches for transfer information files being created and removed, + - and maintains the DaemonStatus currentTransfers map and the + - TransferSlots QSemN. + - (This uses inotify on .git/annex/transfer/, so there are + - additional inotify threads associated with it, too.) + - Thread 10: transferrer + - Waits for Transfers to be queued and does them. + - Thread 11: status logger - Wakes up periodically and records the daemon's status to disk. - - Thread 7: sanity checker + - Thread 12: sanity checker - Wakes up periodically (rarely) and does sanity checks. - - ThreadState: (MVar) - The Annex state is stored here, which allows resuscitating the - - Annex monad in IO actions run by the inotify and committer + - Annex monad in IO actions run by the watcher and committer - threads. Thus, a single state is shared amoung the threads, and - only one at a time can access it. - DaemonStatusHandle: (MVar) @@ -39,6 +59,20 @@ - ChangeChan: (STM TChan) - Changes are indicated by writing to this channel. The committer - reads from it. + - CommitChan: (STM TChan) + - Commits are indicated by writing to this channel. The pusher reads + - from it. + - FailedPushMap (STM TMVar) + - Failed pushes are indicated by writing to this TMVar. The push + - retrier blocks until they're available. + - TransferQueue (STM TChan) + - Transfers to make are indicated by writing to this channel. + - TransferSlots (QSemN) + - Count of the number of currently available transfer slots. + - Updated by the transfer watcher, this allows other threads + - to block until a slot is available. + - This MVar should only be manipulated from inside the Annex monad, + - which ensures it's accessed only after the ThreadState MVar. -} module Assistant where @@ -47,39 +81,55 @@ import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes -import Assistant.Watcher -import Assistant.Committer -import Assistant.SanityChecker +import Assistant.Commits +import Assistant.Pushes +import Assistant.TransferQueue +import Assistant.TransferSlots +import Assistant.Threads.Watcher +import Assistant.Threads.Committer +import Assistant.Threads.Pusher +import Assistant.Threads.Merger +import Assistant.Threads.TransferWatcher +import Assistant.Threads.Transferrer +import Assistant.Threads.SanityChecker import qualified Utility.Daemon import Utility.LogFile +import Utility.ThreadScheduler import Control.Concurrent -startDaemon :: Bool -> Annex () -startDaemon foreground +startDaemon :: Bool -> Bool -> Annex () +startDaemon assistant foreground | foreground = do - showStart "watch" "." + showStart (if assistant then "assistant" else "watch") "." go id | otherwise = do logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile pidfile <- fromRepo gitAnnexPidFile go $ Utility.Daemon.daemonize logfd (Just pidfile) False where - go a = withThreadState $ \st -> do + go daemonize = withThreadState $ \st -> do checkCanWatch dstatus <- startDaemonStatus - liftIO $ a $ do - changechan <- newChangeChan - -- The commit thread is started early, - -- so that the user can immediately - -- begin adding files and having them - -- committed, even while the startup scan - -- is taking place. - _ <- forkIO $ commitThread st changechan - _ <- forkIO $ daemonStatusThread st dstatus - _ <- forkIO $ sanityCheckerThread st dstatus changechan - -- Does not return. - watchThread st dstatus changechan + liftIO $ daemonize $ run dstatus st + run dstatus st = do + changechan <- newChangeChan + commitchan <- newCommitChan + pushmap <- newFailedPushMap + transferqueue <- newTransferQueue + transferslots <- newTransferSlots + mapM_ (void . forkIO) + [ commitThread st changechan commitchan transferqueue dstatus + , pushThread st dstatus commitchan pushmap + , pushRetryThread st pushmap + , mergeThread st + , transferWatcherThread st dstatus + , transfererThread st dstatus transferqueue transferslots + , daemonStatusThread st dstatus + , sanityCheckerThread st dstatus transferqueue changechan + , watchThread st dstatus transferqueue changechan + ] + waitForTermination stopDaemon :: Annex () stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index 173ba1922..eca922109 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -1,6 +1,8 @@ {- git-annex assistant change tracking - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} module Assistant.Changes where @@ -8,14 +10,14 @@ module Assistant.Changes where import Common.Annex import qualified Annex.Queue import Types.KeySource +import Utility.TSet -import Control.Concurrent.STM import Data.Time.Clock data ChangeType = AddChange | LinkChange | RmChange | RmDirChange deriving (Show, Eq) -type ChangeChan = TChan Change +type ChangeChan = TSet Change data Change = Change @@ -29,11 +31,8 @@ data Change } deriving (Show) -runChangeChan :: STM a -> IO a -runChangeChan = atomically - newChangeChan :: IO ChangeChan -newChangeChan = atomically newTChan +newChangeChan = newTSet {- Handlers call this when they made a change that needs to get committed. -} madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) @@ -65,17 +64,13 @@ finishedChange c = c {- Gets all unhandled changes. - Blocks until at least one change is made. -} getChanges :: ChangeChan -> IO [Change] -getChanges chan = runChangeChan $ do - c <- readTChan chan - go [c] - where - go l = do - v <- tryReadTChan chan - case v of - Nothing -> return l - Just c -> go (c:l) +getChanges = getTSet {- Puts unhandled changes back into the channel. - Note: Original order is not preserved. -} refillChanges :: ChangeChan -> [Change] -> IO () -refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs +refillChanges = putTSet + +{- Records a change in the channel. -} +recordChange :: ChangeChan -> Change -> IO () +recordChange = putTSet1 diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs new file mode 100644 index 000000000..86fd7599f --- /dev/null +++ b/Assistant/Commits.hs @@ -0,0 +1,34 @@ +{- git-annex assistant commit tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Commits where + +import Utility.TSet + +import Data.Time.Clock + +type CommitChan = TSet Commit + +data Commit = Commit UTCTime + deriving (Show) + +newCommitChan :: IO CommitChan +newCommitChan = newTSet + +{- Gets all unhandled commits. + - Blocks until at least one commit is made. -} +getCommits :: CommitChan -> IO [Commit] +getCommits = getTSet + +{- Puts unhandled commits back into the channel. + - Note: Original order is not preserved. -} +refillCommits :: CommitChan -> [Commit] -> IO () +refillCommits = putTSet + +{- Records a commit in the channel. -} +recordCommit :: CommitChan -> Commit -> IO () +recordCommit = putTSet1 diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index e5ba3d151..64c441cee 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -1,6 +1,8 @@ {- git-annex assistant daemon status - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} module Assistant.DaemonStatus where @@ -9,12 +11,15 @@ import Common.Annex import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.TempFile +import Logs.Transfer +import qualified Command.Sync import Control.Concurrent import System.Posix.Types import Data.Time.Clock.POSIX import Data.Time import System.Locale +import qualified Data.Map as M data DaemonStatus = DaemonStatus -- False when the daemon is performing its startup scan @@ -25,9 +30,15 @@ data DaemonStatus = DaemonStatus , sanityCheckRunning :: Bool -- Last time the sanity checker ran , lastSanityCheck :: Maybe POSIXTime + -- Currently running file content transfers + , currentTransfers :: TransferMap + -- Ordered list of remotes to talk to. + , knownRemotes :: [Remote] } deriving (Show) +type TransferMap = M.Map Transfer TransferInfo + type DaemonStatusHandle = MVar DaemonStatus newDaemonStatus :: DaemonStatus @@ -36,24 +47,33 @@ newDaemonStatus = DaemonStatus , lastRunning = Nothing , sanityCheckRunning = False , lastSanityCheck = Nothing + , currentTransfers = M.empty + , knownRemotes = [] } getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus getDaemonStatus = liftIO . readMVar -modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () -modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a) +modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () +modifyDaemonStatus_ handle a = liftIO $ modifyMVar_ handle (return . a) + +modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> Annex b +modifyDaemonStatus handle a = liftIO $ modifyMVar handle (return . a) {- Load any previous daemon status file, and store it in the MVar for this - - process to use as its DaemonStatus. -} + - process to use as its DaemonStatus. Also gets current transfer status. -} startDaemonStatus :: Annex DaemonStatusHandle startDaemonStatus = do file <- fromRepo gitAnnexDaemonStatusFile status <- liftIO $ catchDefaultIO (readDaemonStatusFile file) newDaemonStatus + transfers <- M.fromList <$> getTransfers + remotes <- Command.Sync.syncRemotes [] liftIO $ newMVar status { scanComplete = False , sanityCheckRunning = False + , currentTransfers = transfers + , knownRemotes = remotes } {- This thread wakes up periodically and writes the daemon status to disk. -} @@ -117,3 +137,18 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status) tenMinutes :: Int tenMinutes = 10 * 60 + +{- Mutates the transfer map. -} +adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex () +adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $ + \s -> s { currentTransfers = a (currentTransfers s) } + +{- Removes a transfer from the map, and returns its info. -} +removeTransfer :: DaemonStatusHandle -> Transfer -> Annex (Maybe TransferInfo) +removeTransfer dstatus t = modifyDaemonStatus dstatus go + where + go s = + let (info, ts) = M.updateLookupWithKey + (\_k _v -> Nothing) + t (currentTransfers s) + in (s { currentTransfers = ts }, info) diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs new file mode 100644 index 000000000..f411dda07 --- /dev/null +++ b/Assistant/Pushes.hs @@ -0,0 +1,46 @@ +{- git-annex assistant push tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Pushes where + +import Common.Annex + +import Control.Concurrent.STM +import Data.Time.Clock +import qualified Data.Map as M + +{- Track the most recent push failure for each remote. -} +type PushMap = M.Map Remote UTCTime +type FailedPushMap = TMVar PushMap + +{- The TMVar starts empty, and is left empty when there are no + - failed pushes. This way we can block until there are some failed pushes. + -} +newFailedPushMap :: IO FailedPushMap +newFailedPushMap = atomically newEmptyTMVar + +{- Blocks until there are failed pushes. + - Returns Remotes whose pushes failed a given time duration or more ago. + - (This may be an empty list.) -} +getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote] +getFailedPushesBefore v duration = do + m <- atomically $ readTMVar v + now <- getCurrentTime + return $ M.keys $ M.filter (not . toorecent now) m + where + toorecent now time = now `diffUTCTime` time < duration + +{- Modifies the map. -} +changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO () +changeFailedPushMap v a = atomically $ + store . a . fromMaybe M.empty =<< tryTakeTMVar v + where + {- tryTakeTMVar empties the TMVar; refill it only if + - the modified map is not itself empty -} + store m + | m == M.empty = noop + | otherwise = putTMVar v $! m diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs index 51f579d07..1decd8e91 100644 --- a/Assistant/ThreadedMonad.hs +++ b/Assistant/ThreadedMonad.hs @@ -1,17 +1,17 @@ {- making the Annex monad available across threads - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE BangPatterns #-} - module Assistant.ThreadedMonad where import Common.Annex import qualified Annex import Control.Concurrent -import Control.Exception (throw) +import Data.Tuple {- The Annex state is stored in a MVar, so that threaded actions can access - it. -} @@ -32,13 +32,18 @@ withThreadState a = do {- Runs an Annex action, using the state from the MVar. - - - This serializes calls by threads. -} + - This serializes calls by threads; only one thread can run in Annex at a + - time. -} runThreadState :: ThreadState -> Annex a -> IO a -runThreadState mvar a = do - startstate <- takeMVar mvar - -- catch IO errors and rethrow after restoring the MVar - !(r, newstate) <- catchIO (Annex.run startstate a) $ \e -> do - putMVar mvar startstate - throw e - putMVar mvar newstate - return r +runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a + +{- Runs an Annex action, using a copy of the state from the MVar. + - + - It's up to the action to perform any necessary shutdown tasks in order + - for state to not be lost. And it's up to the caller to resynchronise + - with any changes the action makes to eg, the git-annex branch. + -} +unsafeRunThreadState :: ThreadState -> Annex a -> IO () +unsafeRunThreadState mvar a = do + state <- readMVar mvar + void $ Annex.eval state a diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs index 63df8cafc..ff5cc9eab 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -1,14 +1,20 @@ {- git-annex assistant commit thread - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} -module Assistant.Committer where +module Assistant.Threads.Committer where import Common.Annex import Assistant.Changes +import Assistant.Commits import Assistant.ThreadedMonad -import Assistant.Watcher +import Assistant.Threads.Watcher +import Assistant.TransferQueue +import Assistant.DaemonStatus +import Logs.Transfer import qualified Annex import qualified Annex.Queue import qualified Git.Command @@ -26,8 +32,8 @@ import qualified Data.Set as S import Data.Either {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> IO () -commitThread st changechan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO () +commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. @@ -36,10 +42,11 @@ commitThread st changechan = runEvery (Seconds 1) $ do time <- getCurrentTime if shouldCommit time changes then do - readychanges <- handleAdds st changechan changes + readychanges <- handleAdds st changechan transferqueue dstatus changes if shouldCommit time readychanges then do void $ tryIO $ runThreadState st commitStaged + recordCommit commitchan (Commit time) else refillChanges changechan readychanges else refillChanges changechan changes @@ -93,8 +100,8 @@ shouldCommit now changes - Any pending adds that are not ready yet are put back into the ChangeChan, - where they will be retried later. -} -handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change] -handleAdds st changechan cs = returnWhen (null pendingadds) $ do +handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change] +handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do (postponed, toadd) <- partitionEithers <$> safeToAdd st pendingadds @@ -106,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do if (DirWatcher.eventsCoalesce || null added) then return $ added ++ otherchanges else do - r <- handleAdds st changechan + r <- handleAdds st changechan transferqueue dstatus =<< getChanges changechan return $ r ++ added ++ otherchanges where @@ -117,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do | otherwise = a add :: Change -> IO (Maybe Change) - add change@(PendingAddChange { keySource = ks }) = do - r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do - showStart "add" $ keyFilename ks - handle (finishedChange change) (keyFilename ks) - =<< Command.Add.ingest ks - return $ maybeMaybe r + add change@(PendingAddChange { keySource = ks }) = + liftM maybeMaybe $ catchMaybeIO $ + sanitycheck ks $ runThreadState st $ do + showStart "add" $ keyFilename ks + key <- Command.Add.ingest ks + handle (finishedChange change) (keyFilename ks) key add _ = return Nothing maybeMaybe (Just j@(Just _)) = j @@ -137,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha + queueTransfers transferqueue dstatus key (Just file) Upload showEndOk return $ Just change diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs new file mode 100644 index 000000000..c7da86a8d --- /dev/null +++ b/Assistant/Threads/Merger.hs @@ -0,0 +1,87 @@ +{- git-annex assistant git merge thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Merger where + +import Common.Annex +import Assistant.ThreadedMonad +import Utility.DirWatcher +import Utility.Types.DirWatcher +import qualified Annex.Branch +import qualified Git +import qualified Git.Command +import qualified Git.Merge +import qualified Git.Branch +import qualified Command.Sync +import qualified Remote + +{- This thread watches for changes to .git/refs/heads/synced/*, + - which indicate incoming pushes. It merges those pushes into the + - currently checked out branch. -} +mergeThread :: ThreadState -> IO () +mergeThread st = do + g <- runThreadState st $ fromRepo id + let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced" + createDirectoryIfMissing True dir + let hook a = Just $ runHandler g a + let hooks = mkWatchHooks + { addHook = hook onAdd + , errHook = hook onErr + } + void $ watchDir dir (const False) hooks id + +type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler g handler file filestatus = void $ do + either print (const noop) =<< tryIO go + where + go = handler g file filestatus + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr _ msg _ = error msg + +{- Called when a new branch ref is written. + - + - This relies on git's atomic method of updating branch ref files, + - which is to first write the new file to .lock, and then rename it + - over the old file. So, ignore .lock files, and the rename ensures + - the watcher sees a new file being added on each update. + - + - At startup, synthetic add events fire, causing this to run, but that's + - ok; it ensures that any changes pushed since the last time the assistant + - ran are merged in. + -} +onAdd :: Handler +onAdd g file _ + | ".lock" `isSuffixOf` file = noop + | otherwise = do + let changedbranch = Git.Ref $ + "refs" </> "heads" </> takeFileName file + current <- Git.Branch.current g + when (Just changedbranch == current) $ + void $ mergeBranch changedbranch g + +mergeBranch :: Git.Ref -> Git.Repo -> IO Bool +mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch + +{- Manually pull from remotes and merge their branches. Called by the pusher + - when a push fails, which can happen due to a remote not having pushed + - changes to us. That could be because it doesn't have us as a remote, or + - because the assistant is not running there, or other reasons. -} +manualPull :: Git.Ref -> [Remote] -> Annex () +manualPull currentbranch remotes = do + forM_ remotes $ \r -> + inRepo $ Git.Command.runBool "fetch" [Param $ Remote.name r] + Annex.Branch.forceUpdate + forM_ remotes $ \r -> + Command.Sync.mergeRemote r currentbranch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs new file mode 100644 index 000000000..6d6836120 --- /dev/null +++ b/Assistant/Threads/Pusher.hs @@ -0,0 +1,90 @@ +{- git-annex assistant git pushing threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Pusher where + +import Common.Annex +import Assistant.Commits +import Assistant.Pushes +import Assistant.DaemonStatus +import Assistant.ThreadedMonad +import Assistant.Threads.Merger +import qualified Command.Sync +import Utility.ThreadScheduler +import Utility.Parallel + +import Data.Time.Clock +import qualified Data.Map as M + +{- This thread retries pushes that failed before. -} +pushRetryThread :: ThreadState -> FailedPushMap -> IO () +pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do + -- We already waited half an hour, now wait until there are failed + -- pushes to retry. + topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) + unless (null topush) $ do + now <- getCurrentTime + pushToRemotes now st pushmap topush + where + halfhour = 1800 + +{- This thread pushes git commits out to remotes soon after they are made. -} +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO () +pushThread st daemonstatus commitchan pushmap = do + runEvery (Seconds 2) $ do + -- We already waited two seconds as a simple rate limiter. + -- Next, wait until at least one commit has been made + commits <- getCommits commitchan + -- Now see if now's a good time to push. + now <- getCurrentTime + if shouldPush now commits + then do + remotes <- runThreadState st $ + knownRemotes <$> getDaemonStatus daemonstatus + pushToRemotes now st pushmap remotes + else refillCommits commitchan commits + +{- Decide if now is a good time to push to remotes. + - + - Current strategy: Immediately push all commits. The commit machinery + - already determines batches of changes, so we can't easily determine + - batches better. + -} +shouldPush :: UTCTime -> [Commit] -> Bool +shouldPush _now commits + | not (null commits) = True + | otherwise = False + +{- Updates the local sync branch, then pushes it to all remotes, in + - parallel. + - + - Avoids running possibly long-duration commands in the Annex monad, so + - as not to block other threads. -} +pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO () +pushToRemotes now st pushmap remotes = do + (g, branch) <- runThreadState st $ + (,) <$> fromRepo id <*> Command.Sync.currentBranch + go True branch g remotes + where + go shouldretry branch g rs = do + Command.Sync.updateBranch (Command.Sync.syncBranch branch) g + (succeeded, failed) <- inParallel (push g branch) rs + changeFailedPushMap pushmap $ \m -> + M.union (makemap failed) $ + M.difference m (makemap succeeded) + unless (null failed || not shouldretry) $ + retry branch g failed + + makemap l = M.fromList $ zip l (repeat now) + + push g branch remote = + ifM (Command.Sync.pushBranch remote branch g) + ( exitSuccess, exitFailure) + + retry branch g rs = do + runThreadState st $ manualPull branch rs + go False branch g rs diff --git a/Assistant/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index e2ca9da74..c5b99863e 100644 --- a/Assistant/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -1,9 +1,11 @@ {- git-annex assistant sanity checker - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} -module Assistant.SanityChecker ( +module Assistant.Threads.SanityChecker ( sanityCheckerThread ) where @@ -12,26 +14,27 @@ import qualified Git.LsFiles import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Changes +import Assistant.TransferQueue import Utility.ThreadScheduler -import qualified Assistant.Watcher +import qualified Assistant.Threads.Watcher as Watcher import Data.Time.Clock.POSIX {- This thread wakes up occasionally to make sure the tree is in good shape. -} -sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () -sanityCheckerThread st status changechan = forever $ do +sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () +sanityCheckerThread st status transferqueue changechan = forever $ do waitForNextCheck st status runThreadState st $ - modifyDaemonStatus status $ \s -> s + modifyDaemonStatus_ status $ \s -> s { sanityCheckRunning = True } now <- getPOSIXTime -- before check started - catchIO (check st status changechan) + catchIO (check st status transferqueue changechan) (runThreadState st . warning . show) runThreadState st $ do - modifyDaemonStatus status $ \s -> s + modifyDaemonStatus_ status $ \s -> s { sanityCheckRunning = False , lastSanityCheck = Just now } @@ -56,8 +59,8 @@ oneDay = 24 * 60 * 60 {- It's important to stay out of the Annex monad as much as possible while - running potentially expensive parts of this check, since remaining in it - will block the watcher. -} -check :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () -check st status changechan = do +check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () +check st status transferqueue changechan = do g <- runThreadState st $ do showSideAction "Running daily check" fromRepo id @@ -77,5 +80,5 @@ check st status changechan = do insanity m = runThreadState st $ warning m addsymlink file s = do insanity $ "found unstaged symlink: " ++ file - Assistant.Watcher.runHandler st status changechan - Assistant.Watcher.onAddSymlink file s + Watcher.runHandler st status transferqueue changechan + Watcher.onAddSymlink file s diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs new file mode 100644 index 000000000..364ce0468 --- /dev/null +++ b/Assistant/Threads/TransferWatcher.hs @@ -0,0 +1,66 @@ +{- git-annex assistant transfer watching thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.TransferWatcher where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Logs.Transfer +import Utility.DirWatcher +import Utility.Types.DirWatcher + +import Data.Map as M + +{- This thread watches for changes to the gitAnnexTransferDir, + - and updates the DaemonStatus's map of ongoing transfers. -} +transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () +transferWatcherThread st dstatus = do + g <- runThreadState st $ fromRepo id + let dir = gitAnnexTransferDir g + createDirectoryIfMissing True dir + let hook a = Just $ runHandler st dstatus a + let hooks = mkWatchHooks + { addHook = hook onAdd + , delHook = hook onDel + , errHook = hook onErr + } + void $ watchDir dir (const False) hooks id + +type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus handler file filestatus = void $ do + either print (const noop) =<< tryIO go + where + go = handler st dstatus file filestatus + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr _ _ msg _ = error msg + +{- Called when a new transfer information file is written. -} +onAdd :: Handler +onAdd st dstatus file _ = case parseTransferFile file of + Nothing -> noop + Just t -> runThreadState st $ go t =<< checkTransfer t + where + go _ Nothing = noop -- transfer already finished + go t (Just info) = adjustTransfers dstatus $ + M.insertWith' merge t info + -- preseve transferTid, which is not written to disk + merge new old = new { transferTid = transferTid old } + +{- Called when a transfer information file is removed. -} +onDel :: Handler +onDel st dstatus file _ = case parseTransferFile file of + Nothing -> noop + Just t -> void $ runThreadState st $ removeTransfer dstatus t diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs new file mode 100644 index 000000000..c439d8b7e --- /dev/null +++ b/Assistant/Threads/Transferrer.hs @@ -0,0 +1,104 @@ +{- git-annex assistant data transferrer thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Transferrer where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.TransferQueue +import Assistant.TransferSlots +import Logs.Transfer +import Logs.Presence +import Logs.Location +import Annex.Content +import qualified Remote + +import Data.Time.Clock.POSIX +import Data.Time.Clock +import qualified Data.Map as M + +{- For now only one transfer is run at a time. -} +maxTransfers :: Int +maxTransfers = 1 + +{- Dispatches transfers from the queue. -} +transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO () +transfererThread st dstatus transferqueue slots = go + where + go = do + (t, info) <- getNextTransfer transferqueue + whenM (runThreadState st $ shouldTransfer dstatus t info) $ + runTransfer st dstatus slots t info + go + +{- Checks if the requested transfer is already running, or + - the file to download is already present, or the remote + - being uploaded to isn't known to have the file. -} +shouldTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> Annex Bool +shouldTransfer dstatus t info = + go =<< currentTransfers <$> getDaemonStatus dstatus + where + go m + | M.member t m = return False + | transferDirection t == Download = + not <$> inAnnex key + | transferDirection t == Upload = + {- Trust the location log to check if the + - remote already has the key. This avoids + - a roundtrip to the remote. -} + case transferRemote info of + Nothing -> return False + Just remote -> + notElem (Remote.uuid remote) + <$> loggedLocations key + | otherwise = return False + key = transferKey t + +{- A transfer is run in a separate thread, with a *copy* of the Annex + - state. This is necessary to avoid blocking the rest of the assistant + - on the transfer completing, and also to allow multiple transfers to run + - at once. This requires GHC's threaded runtime to work! + - + - The copy of state means that the transfer processes are responsible + - for doing any necessary shutdown cleanups, and that the parent + - thread's cache must be invalidated once a transfer completes, as + - changes may have been made to the git-annex branch. + -} +runTransfer :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> IO () +runTransfer st dstatus slots t info = case (transferRemote info, associatedFile info) of + (Nothing, _) -> noop + (_, Nothing) -> noop + (Just remote, Just file) -> do + tid <- inTransferSlot slots st $ + transferprocess remote file + now <- getCurrentTime + runThreadState st $ adjustTransfers dstatus $ + M.insertWith' const t info + { startedTime = Just $ utcTimeToPOSIXSeconds now + , transferTid = Just tid + } + where + isdownload = transferDirection t == Download + tofrom + | isdownload = "from" + | otherwise = "to" + key = transferKey t + + transferprocess remote file = do + showStart "copy" file + showAction $ tofrom ++ " " ++ Remote.name remote + ok <- transfer t (Just file) $ + if isdownload + then getViaTmp key $ + Remote.retrieveKeyFile remote key (Just file) + else do + ok <- Remote.storeKey remote key $ Just file + when ok $ + Remote.logStatus remote key InfoPresent + return ok + showEndResult ok diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs index db58f01e8..9f0eba74e 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -7,12 +7,14 @@ {-# LANGUAGE CPP #-} -module Assistant.Watcher where +module Assistant.Threads.Watcher where import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes +import Assistant.TransferQueue +import Logs.Transfer import Utility.DirWatcher import Utility.Types.DirWatcher import qualified Annex @@ -27,7 +29,6 @@ import Annex.Content import Annex.CatFile import Git.Types -import Control.Concurrent.STM import Data.Bits.Utils import qualified Data.ByteString.Lazy as L @@ -46,11 +47,11 @@ needLsof = error $ unlines , "Be warned: This can corrupt data in the annex, and make fsck complain." ] -watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () -watchThread st dstatus changechan = watchDir "." ignored hooks startup +watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO () +watchThread st dstatus transferqueue changechan = void $ watchDir "." ignored hooks startup where startup = statupScan st dstatus - hook a = Just $ runHandler st dstatus changechan a + hook a = Just $ runHandler st dstatus transferqueue changechan a hooks = WatchHooks { addHook = hook onAdd , delHook = hook onDel @@ -66,7 +67,7 @@ statupScan st dstatus scanner = do showAction "scanning" r <- scanner runThreadState st $ - modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } + modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True } -- Notice any files that were deleted before watching was started. runThreadState st $ do @@ -83,23 +84,22 @@ ignored = ig . takeFileName ig ".gitattributes" = True ig _ = False -type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) +type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) {- Runs an action handler, inside the Annex monad, and if there was a - change, adds it to the ChangeChan. - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus changechan handler file filestatus = void $ do +runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus transferqueue changechan handler file filestatus = void $ do r <- tryIO go case r of Left e -> print e Right Nothing -> noop - Right (Just change) -> void $ - runChangeChan $ writeTChan changechan change + Right (Just change) -> recordChange changechan change where - go = runThreadState st $ handler file filestatus dstatus + go = runThreadState st $ handler file filestatus dstatus transferqueue {- During initial directory scan, this will be run for any regular files - that are already checked into git. We don't want to turn those into @@ -120,7 +120,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do - the add. -} onAdd :: Handler -onAdd file filestatus dstatus +onAdd file filestatus dstatus _ | maybe False isRegularFile filestatus = do ifM (scanComplete <$> getDaemonStatus dstatus) ( go @@ -138,12 +138,15 @@ onAdd file filestatus dstatus - before adding it. -} onAddSymlink :: Handler -onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file +onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile file where go (Just (key, _)) = do link <- calcGitLink file key ifM ((==) link <$> liftIO (readSymbolicLink file)) - ( ensurestaged link =<< getDaemonStatus dstatus + ( do + s <- getDaemonStatus dstatus + checkcontent key s + ensurestaged link s , do liftIO $ removeFile file liftIO $ createSymbolicLink link file @@ -185,8 +188,16 @@ onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file stageSymlink file sha madeChange file LinkChange + {- When a new link appears, after the startup scan, + - try to get the key's content. -} + checkcontent key daemonstatus + | scanComplete daemonstatus = unlessM (inAnnex key) $ + queueTransfers transferqueue dstatus + key (Just file) Download + | otherwise = noop + onDel :: Handler -onDel file _ _dstatus = do +onDel file _ _dstatus _ = do Annex.Queue.addUpdateIndex =<< inRepo (Git.UpdateIndex.unstageFile file) madeChange file RmChange @@ -199,14 +210,14 @@ onDel file _ _dstatus = do - command to get the recursive list of files in the directory, so rm is - just as good. -} onDelDir :: Handler -onDelDir dir _ _dstatus = do +onDelDir dir _ _dstatus _ = do Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] madeChange dir RmDirChange {- Called when there's an error with inotify. -} onErr :: Handler -onErr msg _ _dstatus = do +onErr msg _ _dstatus _ = do warning msg return Nothing diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs new file mode 100644 index 000000000..b0eca96c8 --- /dev/null +++ b/Assistant/TransferQueue.hs @@ -0,0 +1,75 @@ +{- git-annex assistant pending transfer queue + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.TransferQueue where + +import Common.Annex +import Assistant.DaemonStatus +import Logs.Transfer +import Types.Remote +import qualified Remote + +import Control.Concurrent.STM + +type TransferQueue = TChan (Transfer, TransferInfo) + +newTransferQueue :: IO TransferQueue +newTransferQueue = atomically newTChan + +stubInfo :: AssociatedFile -> TransferInfo +stubInfo f = TransferInfo + { startedTime = Nothing + , transferPid = Nothing + , transferTid = Nothing + , transferRemote = Nothing + , bytesComplete = Nothing + , associatedFile = f + } + +{- Adds pending transfers to the end of the queue for some of the known + - remotes. -} +queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () +queueTransfers q daemonstatus k f direction = do + rs <- knownRemotes <$> getDaemonStatus daemonstatus + mapM_ (\r -> queue r $ gentransfer r) + =<< sufficientremotes rs + where + sufficientremotes l + -- Queue downloads from all remotes that + -- have the key, with the cheapest ones first. + -- More expensive ones will only be tried if + -- downloading from a cheap one fails. + | direction == Download = do + uuids <- Remote.keyLocations k + return $ filter (\r -> uuid r `elem` uuids) l + -- TODO: Determine a smaller set of remotes that + -- can be uploaded to, in order to ensure all + -- remotes can access the content. Currently, + -- send to every remote we can. + | otherwise = return l + gentransfer r = Transfer + { transferDirection = direction + , transferKey = k + , transferUUID = Remote.uuid r + } + queue r t = liftIO $ void $ atomically $ do + let info = (stubInfo f) { transferRemote = Just r } + writeTChan q (t, info) + +{- Adds a pending transfer to the end of the queue. -} +queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () +queueTransfer q f t = void $ atomically $ + writeTChan q (t, stubInfo f) + +{- Adds a pending transfer to the start of the queue, to be processed next. -} +queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () +queueNextTransfer q f t = void $ atomically $ + unGetTChan q (t, stubInfo f) + +{- Blocks until a pending transfer is available in the queue. -} +getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) +getNextTransfer = atomically . readTChan diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs new file mode 100644 index 000000000..dc077254d --- /dev/null +++ b/Assistant/TransferSlots.hs @@ -0,0 +1,40 @@ +{- git-annex assistant transfer slots + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.TransferSlots where + +import Control.Exception +import Control.Concurrent + +import Common.Annex +import Assistant.ThreadedMonad + +type TransferSlots = QSemN + +{- Number of concurrent transfers allowed to be run from the assistant. + - + - Transfers launched by other means, including by remote assistants, + - do not currently take up slots. + -} +numSlots :: Int +numSlots = 1 + +newTransferSlots :: IO TransferSlots +newTransferSlots = newQSemN numSlots + +{- Waits until a transfer slot becomes available, and runs a transfer + - action in the slot, in its own thread. -} +inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId +inTransferSlot s st a = forkIO $ bracket_ start done run + where + start = waitQSemN s 1 + done = transferComplete s + run = unsafeRunThreadState st a + +{- Call when a transfer is complete. -} +transferComplete :: TransferSlots -> IO () +transferComplete s = signalQSemN s 1 diff --git a/Backend/SHA.hs b/Backend/SHA.hs index cf61139e0..bb400a768 100644 --- a/Backend/SHA.hs +++ b/Backend/SHA.hs @@ -53,14 +53,16 @@ shaN shasize file filesize = do showAction "checksum" case shaCommand shasize filesize of Left sha -> liftIO $ sha <$> L.readFile file - Right command -> liftIO $ runcommand command + Right command -> liftIO $ parse command . lines <$> + readProcess command (toCommand [File file]) where - runcommand command = - pOpen ReadFromPipe command (toCommand [File file]) $ \h -> do - sha <- fst . separate (== ' ') <$> hGetLine h - if null sha - then error $ command ++ " parse error" - else return sha + parse command [] = bad command + parse command (l:_) + | null sha = bad command + | otherwise = sha + where + sha = fst $ separate (== ' ') l + bad command = error $ command ++ " parse error" shaCommand :: SHASize -> Integer -> Either (L.ByteString -> String) String shaCommand shasize filesize diff --git a/Build/Configure.hs b/Build/Configure.hs index cf6623b22..9468e1704 100644 --- a/Build/Configure.hs +++ b/Build/Configure.hs @@ -4,7 +4,7 @@ module Build.Configure where import System.Directory import Data.List -import System.Cmd.Utils +import System.Process import Control.Applicative import System.FilePath @@ -71,7 +71,7 @@ getVersionString = do getGitVersion :: Test getGitVersion = do - (_, s) <- pipeFrom "git" ["--version"] + s <- readProcess "git" ["--version"] "" let version = unwords $ drop 2 $ words $ head $ lines s return $ Config "gitversion" (StringConfig version) diff --git a/Command/Assistant.hs b/Command/Assistant.hs new file mode 100644 index 000000000..60eac5d21 --- /dev/null +++ b/Command/Assistant.hs @@ -0,0 +1,18 @@ +{- git-annex assistant + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Command.Assistant where + +import Command +import qualified Command.Watch + +def :: [Command] +def = [withOptions [Command.Watch.foregroundOption, Command.Watch.stopOption] $ + command "assistant" paramNothing seek "automatically handle changes"] + +seek :: [CommandSeek] +seek = Command.Watch.mkSeek True diff --git a/Command/Fsck.hs b/Command/Fsck.hs index 10cca489b..0e3cc934c 100644 --- a/Command/Fsck.hs +++ b/Command/Fsck.hs @@ -7,6 +7,8 @@ module Command.Fsck where +import System.Posix.Process (getProcessID) + import Common.Annex import Command import qualified Annex diff --git a/Command/Map.hs b/Command/Map.hs index 0773f6828..3dbdadbd6 100644 --- a/Command/Map.hs +++ b/Command/Map.hs @@ -199,8 +199,10 @@ tryScan r Left _ -> return Nothing Right r' -> return $ Just r' pipedconfig cmd params = safely $ - pOpen ReadFromPipe cmd (toCommand params) $ + withHandle StdoutHandle createProcessSuccess p $ Git.Config.hRead r + where + p = proc cmd $ toCommand params configlist = onRemote r (pipedconfig, Nothing) "configlist" [] [] diff --git a/Command/Sync.hs b/Command/Sync.hs index bdb5d47a7..dfaed5949 100644 --- a/Command/Sync.hs +++ b/Command/Sync.hs @@ -39,7 +39,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote)) -- syncing involves several operations, any of which can independently fail seek :: CommandSeek seek rs = do - !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current + branch <- currentBranch remotes <- syncRemotes rs return $ concat [ [ commit ] @@ -49,6 +49,11 @@ seek rs = do , [ pushLocal branch ] , [ pushRemote remote branch | remote <- remotes ] ] + +currentBranch :: Annex Git.Ref +currentBranch = do + !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current + return branch where nobranch = error "no branch is checked out" @@ -98,7 +103,7 @@ mergeLocal branch = go =<< needmerge syncbranch = syncBranch branch needmerge = do unlessM (inRepo $ Git.Ref.exists syncbranch) $ - updateBranch syncbranch + inRepo $ updateBranch syncbranch inRepo $ Git.Branch.changed branch syncbranch go False = stop go True = do @@ -107,17 +112,17 @@ mergeLocal branch = go =<< needmerge pushLocal :: Git.Ref -> CommandStart pushLocal branch = do - updateBranch $ syncBranch branch + inRepo $ updateBranch $ syncBranch branch stop -updateBranch :: Git.Ref -> Annex () -updateBranch syncbranch = +updateBranch :: Git.Ref -> Git.Repo -> IO () +updateBranch syncbranch g = unlessM go $ error $ "failed to update " ++ show syncbranch where - go = inRepo $ Git.Command.runBool "branch" + go = Git.Command.runBool "branch" [ Param "-f" , Param $ show $ Git.Ref.base syncbranch - ] + ] g pullRemote :: Remote -> Git.Ref -> CommandStart pullRemote remote branch = do @@ -143,19 +148,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge) pushRemote :: Remote -> Git.Ref -> CommandStart pushRemote remote branch = go =<< needpush where - needpush = anyM (newer remote) [syncbranch, Annex.Branch.name] + needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name] go False = stop go True = do showStart "push" (Remote.name remote) next $ next $ do showOutput - inRepo $ Git.Command.runBool "push" - [ Param (Remote.name remote) - , Param (show Annex.Branch.name) - , Param refspec - ] - refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch) - syncbranch = syncBranch branch + inRepo $ pushBranch remote branch + +pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool +pushBranch remote branch g = + Git.Command.runBool "push" + [ Param (Remote.name remote) + , Param (show Annex.Branch.name) + , Param refspec + ] g + where + refspec = concat + [ show $ Git.Ref.base branch + , ":" + , show $ Git.Ref.base $ syncBranch branch + ] mergeAnnex :: CommandStart mergeAnnex = do diff --git a/Command/Watch.hs b/Command/Watch.hs index 5681b3861..744844c4d 100644 --- a/Command/Watch.hs +++ b/Command/Watch.hs @@ -1,6 +1,3 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE BangPatterns #-} - {- git-annex watch command - - Copyright 2012 Joey Hess <joey@kitenet.net> @@ -19,10 +16,13 @@ def :: [Command] def = [withOptions [foregroundOption, stopOption] $ command "watch" paramNothing seek "watch for changes"] -seek :: [CommandSeek] -seek = [withFlag stopOption $ \stopdaemon -> +mkSeek :: Bool -> [CommandSeek] +mkSeek assistant = [withFlag stopOption $ \stopdaemon -> withFlag foregroundOption $ \foreground -> - withNothing $ start foreground stopdaemon] + withNothing $ start assistant foreground stopdaemon] + +seek :: [CommandSeek] +seek = mkSeek False foregroundOption :: Option foregroundOption = Option.flag [] "foreground" "do not daemonize" @@ -30,9 +30,9 @@ foregroundOption = Option.flag [] "foreground" "do not daemonize" stopOption :: Option stopOption = Option.flag [] "stop" "stop daemon" -start :: Bool -> Bool -> CommandStart -start foreground stopdaemon = notBareRepo $ do +start :: Bool -> Bool -> Bool -> CommandStart +start assistant foreground stopdaemon = notBareRepo $ do if stopdaemon then stopDaemon - else startDaemon foreground -- does not return + else startDaemon assistant foreground -- does not return stop @@ -13,16 +13,15 @@ import Data.String.Utils as X import System.Path as X import System.FilePath as X import System.Directory as X -import System.Cmd.Utils as X hiding (safeSystem) import System.IO as X hiding (FilePath) import System.Posix.Files as X import System.Posix.IO as X -import System.Posix.Process as X hiding (executeFile) import System.Exit as X import Utility.Misc as X import Utility.Exception as X import Utility.SafeCommand as X +import Utility.Process as X import Utility.Path as X import Utility.Directory as X import Utility.Monad as X @@ -56,7 +56,7 @@ remoteCost r def = do cmd <- getRemoteConfig r "cost-command" "" (fromMaybe def . readish) <$> if not $ null cmd - then liftIO $ snd <$> pipeFrom "sh" ["-c", cmd] + then liftIO $ readProcess "sh" ["-c", cmd] else getRemoteConfig r "cost" "" cheapRemoteCost :: Int @@ -116,4 +116,4 @@ getHttpHeaders = do cmd <- getConfig (annexConfig "http-headers-command") "" if null cmd then fromRepo $ Git.Config.getList "annex.http-headers" - else lines . snd <$> liftIO (pipeFrom "sh" ["-c", cmd]) + else lines <$> liftIO (readProcess "sh" ["-c", cmd]) diff --git a/Git/Branch.hs b/Git/Branch.hs index 6edc1c306..4d239d8fc 100644 --- a/Git/Branch.hs +++ b/Git/Branch.hs @@ -73,12 +73,10 @@ commit :: String -> Branch -> [Ref] -> Repo -> IO Sha commit message branch parentrefs repo = do tree <- getSha "write-tree" $ pipeRead [Param "write-tree"] repo - sha <- getSha "commit-tree" $ - ignorehandle $ pipeWriteRead - (map Param $ ["commit-tree", show tree] ++ ps) - message repo + sha <- getSha "commit-tree" $ pipeWriteRead + (map Param $ ["commit-tree", show tree] ++ ps) + message repo run "update-ref" [Param $ show branch, Param $ show sha] repo return sha where - ignorehandle a = snd <$> a ps = concatMap (\r -> ["-p", show r]) parentrefs diff --git a/Git/Command.hs b/Git/Command.hs index 35f0838ba..cd6c98d33 100644 --- a/Git/Command.hs +++ b/Git/Command.hs @@ -7,10 +7,7 @@ module Git.Command where -import qualified Data.Text.Lazy as L -import qualified Data.Text.Lazy.IO as L -import Control.Concurrent -import Control.Exception (finally) +import System.Posix.Process (getAnyProcessStatus) import Common import Git @@ -43,30 +40,19 @@ run subcommand params repo = assertLocal repo $ - result unless reap is called. -} pipeRead :: [CommandParam] -> Repo -> IO String -pipeRead params repo = assertLocal repo $ do - (_, h) <- hPipeFrom "git" $ toCommand $ gitCommandLine params repo - fileEncoding h - hGetContents h - -{- Runs a git subcommand, feeding it input. - - You should call either getProcessStatus or forceSuccess on the PipeHandle. -} -pipeWrite :: [CommandParam] -> L.Text -> Repo -> IO PipeHandle -pipeWrite params s repo = assertLocal repo $ do - (p, h) <- hPipeTo "git" (toCommand $ gitCommandLine params repo) - L.hPutStr h s - hClose h - return p +pipeRead params repo = assertLocal repo $ + withHandle StdoutHandle createBackgroundProcess p $ \h -> do + fileEncoding h + hGetContents h + where + p = proc "git" $ toCommand $ gitCommandLine params repo -{- Runs a git subcommand, feeding it input, and returning its output. - - You should call either getProcessStatus or forceSuccess on the PipeHandle. -} -pipeWriteRead :: [CommandParam] -> String -> Repo -> IO (PipeHandle, String) -pipeWriteRead params s repo = assertLocal repo $ do - (p, from, to) <- hPipeBoth "git" (toCommand $ gitCommandLine params repo) - fileEncoding to - fileEncoding from - _ <- forkIO $ finally (hPutStr to s) (hClose to) - c <- hGetContents from - return (p, c) +{- Runs a git subcommand, feeding it input, and returning its output, + - which is expected to be fairly small, since it's all read into memory + - strictly. -} +pipeWriteRead :: [CommandParam] -> String -> Repo -> IO String +pipeWriteRead params s repo = assertLocal repo $ + writeReadProcess "git" (toCommand $ gitCommandLine params repo) s {- Reads null terminated output of a git command (as enabled by the -z - parameter), and splits it. -} diff --git a/Git/Config.hs b/Git/Config.hs index c9e4f9a2d..c82d6bb1b 100644 --- a/Git/Config.hs +++ b/Git/Config.hs @@ -9,6 +9,7 @@ module Git.Config where import qualified Data.Map as M import Data.Char +import System.Process (cwd) import Common import Git @@ -39,7 +40,7 @@ reRead :: Repo -> IO Repo reRead = read' {- Cannot use pipeRead because it relies on the config having been already - - read. Instead, chdir to the repo. + - read. Instead, chdir to the repo and run git config. -} read' :: Repo -> IO Repo read' repo = go repo @@ -47,9 +48,11 @@ read' repo = go repo go Repo { location = Local { gitdir = d } } = git_config d go Repo { location = LocalUnknown d } = git_config d go _ = assertLocal repo $ error "internal" - git_config d = bracketCd d $ - pOpen ReadFromPipe "git" ["config", "--null", "--list"] $ - hRead repo + git_config d = withHandle StdoutHandle createProcessSuccess p $ + hRead repo + where + params = ["config", "--null", "--list"] + p = (proc "git" params) { cwd = Just d } {- Reads git config from a handle and populates a repo with it. -} hRead :: Repo -> Handle -> IO Repo diff --git a/Git/HashObject.hs b/Git/HashObject.hs index 9f37de5ba..c90c9ec3d 100644 --- a/Git/HashObject.hs +++ b/Git/HashObject.hs @@ -38,11 +38,9 @@ hashFile h file = CoProcess.query h send receive {- Injects some content into git, returning its Sha. -} hashObject :: ObjectType -> String -> Repo -> IO Sha hashObject objtype content repo = getSha subcmd $ do - (h, s) <- pipeWriteRead (map Param params) content repo - length s `seq` do - forceSuccess h - reap -- XXX unsure why this is needed - return s + s <- pipeWriteRead (map Param params) content repo + reap -- XXX unsure why this is needed, of if it is anymore + return s where subcmd = "hash-object" params = [subcmd, "-t", show objtype, "-w", "--stdin"] diff --git a/Git/Queue.hs b/Git/Queue.hs index ddcf13519..f515ad104 100644 --- a/Git/Queue.hs +++ b/Git/Queue.hs @@ -19,7 +19,6 @@ module Git.Queue ( import qualified Data.Map as M import System.IO -import System.Cmd.Utils import Data.String.Utils import Utility.SafeCommand @@ -149,10 +148,11 @@ runAction repo (UpdateIndexAction streamers) = -- list is stored in reverse order Git.UpdateIndex.streamUpdateIndex repo $ reverse streamers runAction repo action@(CommandAction {}) = - pOpen WriteToPipe "xargs" ("-0":"git":params) feedxargs + withHandle StdinHandle createProcessSuccess (proc "xargs" params) $ \h -> do + fileEncoding h + hPutStr h $ join "\0" $ getFiles action + hClose h where - params = toCommand $ gitCommandLine + params = "-0":"git":baseparams + baseparams = toCommand $ gitCommandLine (Param (getSubcommand action):getParams action) repo - feedxargs h = do - fileEncoding h - hPutStr h $ join "\0" $ getFiles action diff --git a/Git/UpdateIndex.hs b/Git/UpdateIndex.hs index abdc4bcbe..929448729 100644 --- a/Git/UpdateIndex.hs +++ b/Git/UpdateIndex.hs @@ -17,8 +17,6 @@ module Git.UpdateIndex ( stageSymlink ) where -import System.Cmd.Utils - import Common import Git import Git.Types @@ -36,13 +34,13 @@ pureStreamer !s = \streamer -> streamer s {- Streams content into update-index from a list of Streamers. -} streamUpdateIndex :: Repo -> [Streamer] -> IO () -streamUpdateIndex repo as = do - (p, h) <- hPipeTo "git" (toCommand $ gitCommandLine params repo) - fileEncoding h - forM_ as (stream h) - hClose h - forceSuccess p +streamUpdateIndex repo as = + withHandle StdinHandle createProcessSuccess (proc "git" ps) $ \h -> do + fileEncoding h + forM_ as (stream h) + hClose h where + ps = toCommand $ gitCommandLine params repo params = map Param ["update-index", "-z", "--index-info"] stream h a = a (streamer h) streamer h s = do diff --git a/GitAnnex.hs b/GitAnnex.hs index bf1f27bfd..7b1fa5986 100644 --- a/GitAnnex.hs +++ b/GitAnnex.hs @@ -62,6 +62,7 @@ import qualified Command.Upgrade import qualified Command.Version #ifdef WITH_ASSISTANT import qualified Command.Watch +import qualified Command.Assistant #endif cmds :: [Command] @@ -106,6 +107,7 @@ cmds = concat , Command.Version.def #ifdef WITH_ASSISTANT , Command.Watch.def + , Command.Assistant.def #endif ] @@ -14,7 +14,7 @@ endif PREFIX=/usr IGNORE=-ignore-package monads-fd -ignore-package monads-tf -BASEFLAGS=-Wall $(IGNORE) -outputdir tmp -IUtility -DWITH_ASSISTANT -DWITH_S3 $(BASEFLAGS_OPTS) +BASEFLAGS=-threaded -Wall $(IGNORE) -outputdir tmp -IUtility -DWITH_ASSISTANT -DWITH_S3 $(BASEFLAGS_OPTS) GHCFLAGS=-O2 $(BASEFLAGS) CFLAGS=-Wall diff --git a/Remote/Bup.hs b/Remote/Bup.hs index 0d1b606d3..8a2c1afef 100644 --- a/Remote/Bup.hs +++ b/Remote/Bup.hs @@ -133,13 +133,13 @@ retrieveCheap :: BupRepo -> Key -> FilePath -> Annex Bool retrieveCheap _ _ _ = return False retrieveEncrypted :: BupRepo -> (Cipher, Key) -> Key -> FilePath -> Annex Bool -retrieveEncrypted buprepo (cipher, enck) _ f = do - let params = bupParams "join" buprepo [Param $ bupRef enck] - liftIO $ catchBoolIO $ do - (pid, h) <- hPipeFrom "bup" $ toCommand params +retrieveEncrypted buprepo (cipher, enck) _ f = liftIO $ catchBoolIO $ + withHandle StdoutHandle createProcessSuccess p $ \h -> do withDecryptedContent cipher (L.hGetContents h) $ L.writeFile f - forceSuccess pid return True + where + params = bupParams "join" buprepo [Param $ bupRef enck] + p = proc "bup" $ toCommand params remove :: Key -> Annex Bool remove _ = do diff --git a/Remote/Git.hs b/Remote/Git.hs index d80f580fc..3412de89b 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -127,16 +127,17 @@ tryGitConfigRead r =<< liftIO (try a :: IO (Either SomeException Git.Repo)) pipedconfig cmd params = safely $ - pOpen ReadFromPipe cmd (toCommand params) $ + withHandle StdoutHandle createProcessSuccess p $ Git.Config.hRead r + where + p = proc cmd $ toCommand params geturlconfig headers = do s <- Url.get (Git.repoLocation r ++ "/config") headers withTempFile "git-annex.tmp" $ \tmpfile h -> do hPutStr h s hClose h - pOpen ReadFromPipe "git" ["config", "--null", "--list", "--file", tmpfile] $ - Git.Config.hRead r + pipedconfig "git" [Param "config", Param "--null", Param "--list", Param "--file", File tmpfile] store = observe $ \r' -> do g <- gitRepo diff --git a/Remote/Hook.hs b/Remote/Hook.hs index 9e8d3c620..cad6e2fc9 100644 --- a/Remote/Hook.hs +++ b/Remote/Hook.hs @@ -9,7 +9,6 @@ module Remote.Hook (remote) where import qualified Data.ByteString.Lazy as L import qualified Data.Map as M -import System.Exit import System.Environment import Common.Annex @@ -136,17 +135,5 @@ checkPresent r h k = do findkey s = show k `elem` lines s check Nothing = error "checkpresent hook misconfigured" check (Just hook) = do - (frompipe, topipe) <- createPipe - pid <- forkProcess $ do - _ <- dupTo topipe stdOutput - closeFd frompipe - executeFile "sh" True ["-c", hook] - =<< hookEnv k Nothing - closeFd topipe - fromh <- fdToHandle frompipe - reply <- hGetContentsStrict fromh - hClose fromh - s <- getProcessStatus True False pid - case s of - Just (Exited ExitSuccess) -> return $ findkey reply - _ -> error "checkpresent hook failed" + env <- hookEnv k Nothing + findkey <$> readProcessEnv "sh" ["-c", hook] env diff --git a/Remote/Rsync.hs b/Remote/Rsync.hs index 29bceb2db..ee516a8a5 100644 --- a/Remote/Rsync.hs +++ b/Remote/Rsync.hs @@ -9,6 +9,7 @@ module Remote.Rsync (remote) where import qualified Data.ByteString.Lazy as L import qualified Data.Map as M +import System.Posix.Process (getProcessID) import Common.Annex import Types.Remote @@ -108,9 +108,9 @@ withNothing _ _ = error "This command takes no parameters." prepFiltered :: (FilePath -> CommandStart) -> Annex [FilePath] -> Annex [CommandStart] prepFiltered a fs = do matcher <- Limit.getMatcher - map (proc matcher) <$> fs + map (process matcher) <$> fs where - proc matcher f = do + process matcher f = do ok <- matcher f if ok then a f else return Nothing diff --git a/Utility/CoProcess.hs b/Utility/CoProcess.hs index 9fa8d864f..67f861bb3 100644 --- a/Utility/CoProcess.hs +++ b/Utility/CoProcess.hs @@ -13,23 +13,23 @@ module Utility.CoProcess ( query ) where -import System.Cmd.Utils - import Common -type CoProcessHandle = (PipeHandle, Handle, Handle) +type CoProcessHandle = (ProcessHandle, Handle, Handle, CreateProcess) start :: FilePath -> [String] -> IO CoProcessHandle -start command params = hPipeBoth command params +start command params = do + (from, to, _err, pid) <- runInteractiveProcess command params Nothing Nothing + return (pid, to, from, proc command params) stop :: CoProcessHandle -> IO () -stop (pid, from, to) = do +stop (pid, from, to, p) = do hClose to hClose from - forceSuccess pid + forceSuccessProcess p pid query :: CoProcessHandle -> (Handle -> IO a) -> (Handle -> IO b) -> IO b -query (_, from, to) send receive = do +query (_, from, to, _) send receive = do _ <- send to hFlush to receive from diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs index 11ce7baef..213aeb50a 100644 --- a/Utility/DirWatcher.hs +++ b/Utility/DirWatcher.hs @@ -17,10 +17,10 @@ import Utility.Types.DirWatcher #if WITH_INOTIFY import qualified Utility.INotify as INotify import qualified System.INotify as INotify -import Utility.ThreadScheduler #endif #if WITH_KQUEUE import qualified Utility.Kqueue as Kqueue +import Control.Concurrent #endif type Pruner = FilePath -> Bool @@ -72,19 +72,41 @@ closingTracked = undefined #endif #endif +/* Starts a watcher thread. The runStartup action is passed a scanner action + * to run, that will return once the initial directory scan is complete. + * Once runStartup returns, the watcher thread continues running, + * and processing events. Returns a DirWatcherHandle that can be used + * to shutdown later. */ #if WITH_INOTIFY -watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO () -watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do +type DirWatcherHandle = INotify.INotify +watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle +watchDir dir prune hooks runstartup = do + i <- INotify.initINotify runstartup $ INotify.watchDir i dir prune hooks - waitForTermination -- Let the inotify thread run. + return i #else #if WITH_KQUEUE -watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO () +type DirWatcherHandle = ThreadId +watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO DirWatcherHandle watchDir dir ignored hooks runstartup = do kq <- runstartup $ Kqueue.initKqueue dir ignored - Kqueue.runHooks kq hooks + forkIO $ Kqueue.runHooks kq hooks #else -watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO () +type DirWatcherHandle = () +watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle watchDir = undefined #endif #endif + +#if WITH_INOTIFY +stopWatchDir :: DirWatcherHandle -> IO () +stopWatchDir = INotify.killINotify +#else +#if WITH_KQUEUE +stopWatchDir :: DirWatcherHandle -> IO () +stopWatchDir = killThread +#else +stopWatchDir :: DirWatcherHandle -> IO () +stopWatchDir = undefined +#endif +#endif diff --git a/Utility/Gpg.hs b/Utility/Gpg.hs index e13afe5d4..eed77805c 100644 --- a/Utility/Gpg.hs +++ b/Utility/Gpg.hs @@ -11,8 +11,7 @@ import qualified Data.ByteString.Lazy as L import System.Posix.Types import Control.Applicative import Control.Concurrent -import Control.Exception (finally, bracket) -import System.Exit +import Control.Exception (bracket) import System.Posix.Env (setEnv, unsetEnv, getEnv) import Common @@ -39,18 +38,21 @@ stdParams params = do readStrict :: [CommandParam] -> IO String readStrict params = do params' <- stdParams params - pOpen ReadFromPipe "gpg" params' hGetContentsStrict + withHandle StdoutHandle createProcessSuccess (proc "gpg" params') $ \h -> do + hSetBinaryMode h True + hGetContentsStrict h {- Runs gpg, piping an input value to it, and returning its stdout, - strictly. -} pipeStrict :: [CommandParam] -> String -> IO String pipeStrict params input = do params' <- stdParams params - (pid, fromh, toh) <- hPipeBoth "gpg" params' - _ <- forkIO $ finally (hPutStr toh input) (hClose toh) - output <- hGetContentsStrict fromh - forceSuccess pid - return output + withBothHandles createProcessSuccess (proc "gpg" params') $ \(to, from) -> do + hSetBinaryMode to True + hSetBinaryMode from True + hPutStr to input + hClose to + hGetContentsStrict from {- Runs gpg with some parameters, first feeding it a passphrase via - --passphrase-fd, then feeding it an input, and passing a handle @@ -70,19 +72,13 @@ passphraseHandle params passphrase a b = do let passphrasefd = [Param "--passphrase-fd", Param $ show pfd] params' <- stdParams $ passphrasefd ++ params - (pid, fromh, toh) <- hPipeBoth "gpg" params' - pid2 <- forkProcess $ do - L.hPut toh =<< a - hClose toh - exitSuccess - hClose toh - ret <- b fromh - - -- cleanup - forceSuccess pid - _ <- getProcessStatus True False pid2 - closeFd frompipe - return ret + closeFd frompipe `after` + withBothHandles createProcessSuccess (proc "gpg" params') go + where + go (to, from) = do + L.hPut to =<< a + hClose to + b from {- Finds gpg public keys matching some string. (Could be an email address, - a key id, or a name. -} diff --git a/Utility/INotify.hs b/Utility/INotify.hs index bf87f4e71..6af022819 100644 --- a/Utility/INotify.hs +++ b/Utility/INotify.hs @@ -160,12 +160,9 @@ tooManyWatches hook dir = do querySysctl :: Read a => [CommandParam] -> IO (Maybe a) querySysctl ps = do - v <- catchMaybeIO $ hPipeFrom "sysctl" $ toCommand ps + v <- catchMaybeIO $ readProcess "sysctl" (toCommand ps) case v of Nothing -> return Nothing - Just (pid, h) -> do - val <- parsesysctl <$> hGetContentsStrict h - void $ getProcessStatus True False $ processID pid - return val + Just s -> return $ parsesysctl s where parsesysctl s = readish =<< lastMaybe (words s) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 7e7e653ec..c1a0a5cd6 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -14,8 +14,6 @@ module Utility.Kqueue ( waitChange, Change(..), changedFile, - isAdd, - isDelete, runHooks, ) where @@ -34,19 +32,19 @@ import Control.Concurrent data Change = Deleted FilePath + | DeletedDir FilePath | Added FilePath deriving (Show) isAdd :: Change -> Bool isAdd (Added _) = True isAdd (Deleted _) = False - -isDelete :: Change -> Bool -isDelete = not . isAdd +isAdd (DeletedDir _) = False changedFile :: Change -> FilePath changedFile (Added f) = f changedFile (Deleted f) = f +changedFile (DeletedDir f) = f data Kqueue = Kqueue { kqueueFd :: Fd @@ -59,27 +57,43 @@ type Pruner = FilePath -> Bool type DirMap = M.Map Fd DirInfo -{- A directory, and its last known contents (with filenames relative to it) -} +{- Enough information to uniquely identify a file in a directory, + - but not too much. -} +data DirEnt = DirEnt + { dirEnt :: FilePath -- relative to the parent directory + , _dirInode :: FileID -- included to notice file replacements + , isSubDir :: Bool + } + deriving (Eq, Ord, Show) + +{- A directory, and its last known contents. -} data DirInfo = DirInfo { dirName :: FilePath - , dirCache :: S.Set FilePath + , dirCache :: S.Set DirEnt } deriving (Show) getDirInfo :: FilePath -> IO DirInfo getDirInfo dir = do - contents <- S.fromList . filter (not . dirCruft) - <$> getDirectoryContents dir + l <- filter (not . dirCruft) <$> getDirectoryContents dir + contents <- S.fromList . catMaybes <$> mapM getDirEnt l return $ DirInfo dir contents + where + getDirEnt f = catchMaybeIO $ do + s <- getFileStatus (dir </> f) + return $ DirEnt f (fileID s) (isDirectory s) {- Difference between the dirCaches of two DirInfos. -} (//) :: DirInfo -> DirInfo -> [Change] oldc // newc = deleted ++ added where - deleted = calc Deleted oldc newc - added = calc Added newc oldc - calc a x y = map a . map (dirName x </>) $ - S.toList $ S.difference (dirCache x) (dirCache y) + deleted = calc gendel oldc newc + added = calc genadd newc oldc + gendel x = (if isSubDir x then DeletedDir else Deleted) $ + dirName oldc </> dirEnt x + genadd x = Added $ dirName newc </> dirEnt x + calc a x y = map a $ S.toList $ + S.difference (dirCache x) (dirCache y) {- Builds a map of directories in a tree, possibly pruning some. - Opens each directory in the tree, and records its current contents. -} @@ -99,7 +113,7 @@ scanRecursive topdir prune = M.fromList <$> walk [] [topdir] case mfd of Nothing -> walk c rest Just fd -> do - let subdirs = map (dir </>) $ + let subdirs = map (dir </>) . map dirEnt $ S.toList $ dirCache info walk ((fd, info):c) (subdirs ++ rest) @@ -123,7 +137,8 @@ removeSubDir dirmap dir = do findDirContents :: DirMap -> FilePath -> [FilePath] findDirContents dirmap dir = concatMap absolutecontents $ search where - absolutecontents i = map (dirName i </>) (S.toList $ dirCache i) + absolutecontents i = map (dirName i </>) + (map dirEnt $ S.toList $ dirCache i) search = map snd $ M.toList $ M.filter (\i -> dirName i == dir) dirmap @@ -224,12 +239,14 @@ runHooks kq hooks = do (q', changes) <- waitChange q forM_ changes $ dispatch (kqueueMap q') loop q' - -- Kqueue returns changes for both whole directories - -- being added and deleted, and individual files being - -- added and deleted. - dispatch dirmap change - | isAdd change = withstatus change $ dispatchadd dirmap - | otherwise = callhook delDirHook Nothing change + + dispatch _ change@(Deleted _) = + callhook delHook Nothing change + dispatch _ change@(DeletedDir _) = + callhook delDirHook Nothing change + dispatch dirmap change@(Added _) = + withstatus change $ dispatchadd dirmap + dispatchadd dirmap change s | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change @@ -237,12 +254,15 @@ runHooks kq hooks = do | Files.isRegularFile s = callhook addHook (Just s) change | otherwise = noop + recursiveadd dirmap change = do let contents = findDirContents dirmap $ changedFile change forM_ contents $ \f -> withstatus (Added f) $ dispatchadd dirmap + callhook h s change = case h hooks of Nothing -> noop Just a -> a (changedFile change) s + withstatus change a = maybe noop (a change) =<< (catchMaybeIO (getSymbolicLinkStatus (changedFile change))) diff --git a/Utility/Lsof.hs b/Utility/Lsof.hs index 0061dfe57..ce6a16283 100644 --- a/Utility/Lsof.hs +++ b/Utility/Lsof.hs @@ -33,11 +33,11 @@ queryDir path = query ["+d", path] - Note: If lsof is not available, this always returns [] ! -} query :: [String] -> IO [(FilePath, LsofOpenMode, ProcessInfo)] -query opts = do - (pid, s) <- pipeFrom "lsof" ("-F0can" : opts) - let !r = parse s - void $ getProcessStatus True False $ processID pid - return r +query opts = + withHandle StdoutHandle (createProcessChecked checkSuccessProcess) p $ \h -> do + parse <$> hGetContentsStrict h + where + p = proc "lsof" ("-F0can" : opts) {- Parsing null-delimited output like: - diff --git a/Utility/Misc.hs b/Utility/Misc.hs index 3b359139b..e11586467 100644 --- a/Utility/Misc.hs +++ b/Utility/Misc.hs @@ -33,7 +33,7 @@ separate c l = unbreak $ break c l | otherwise = (a, tail b) {- Breaks out the first line. -} -firstLine :: String-> String +firstLine :: String -> String firstLine = takeWhile (/= '\n') {- Splits a list into segments that are delimited by items matching diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs new file mode 100644 index 000000000..fcab2a90a --- /dev/null +++ b/Utility/Parallel.hs @@ -0,0 +1,35 @@ +{- parallel processing via threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Utility.Parallel where + +import Common + +import Control.Concurrent +import Control.Exception + +{- Runs an action in parallel with a set of values, in a set of threads. + - In order for the actions to truely run in parallel, requires GHC's + - threaded runtime, + - + - Returns the values partitioned into ones with which the action succeeded, + - and ones with which it failed. -} +inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v]) +inParallel a l = do + mvars <- mapM thread l + statuses <- mapM takeMVar mvars + return $ reduce $ partition snd $ zip l statuses + where + reduce (x,y) = (map fst x, map fst y) + thread v = do + mvar <- newEmptyMVar + _ <- forkIO $ do + r <- try (a v) :: IO (Either SomeException ()) + case r of + Left _ -> putMVar mvar False + Right _ -> putMVar mvar True + return mvar diff --git a/Utility/Process.hs b/Utility/Process.hs new file mode 100644 index 000000000..3b293df4f --- /dev/null +++ b/Utility/Process.hs @@ -0,0 +1,214 @@ +{- System.Process enhancements, including additional ways of running + - processes, and logging. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE Rank2Types #-} + +module Utility.Process ( + module X, + CreateProcess, + StdHandle(..), + readProcessEnv, + forceSuccessProcess, + checkSuccessProcess, + createProcessSuccess, + createProcessChecked, + createBackgroundProcess, + withHandle, + withBothHandles, + createProcess, + runInteractiveProcess, + writeReadProcess, + readProcess +) where + +import qualified System.Process +import System.Process as X hiding (CreateProcess(..), createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode, system, rawSystem, runInteractiveCommand, runProcess) +import System.Process hiding (createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode) +import System.Exit +import System.IO +import System.Log.Logger + +import Utility.Misc + +type CreateProcessRunner = forall a. CreateProcess -> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO a) -> IO a + +data StdHandle = StdinHandle | StdoutHandle | StderrHandle + deriving (Eq) + +{- Like readProcess, but allows specifying the environment, and does + - not mess with stdin. -} +readProcessEnv :: FilePath -> [String] -> Maybe [(String, String)] -> IO String +readProcessEnv cmd args environ = + withHandle StdoutHandle createProcessSuccess p $ \h -> do + output <- hGetContentsStrict h + hClose h + return output + where + p = (proc cmd args) + { std_out = CreatePipe + , env = environ + } + +{- Waits for a ProcessHandle, and throws an exception if the process + - did not exit successfully. -} +forceSuccessProcess :: CreateProcess -> ProcessHandle -> IO () +forceSuccessProcess p pid = do + code <- waitForProcess pid + case code of + ExitSuccess -> return () + ExitFailure n -> error $ showCmd p ++ " exited " ++ show n + +{- Waits for a ProcessHandle and returns True if it exited successfully. -} +checkSuccessProcess :: ProcessHandle -> IO Bool +checkSuccessProcess pid = do + code <- waitForProcess pid + return $ code == ExitSuccess + +{- Runs createProcess, then an action on its handles, and then + - forceSuccessProcess. -} +createProcessSuccess :: CreateProcessRunner +createProcessSuccess p a = createProcessChecked (forceSuccessProcess p) p a + +{- Runs createProcess, then an action on its handles, and then + - an action on its exit code. -} +createProcessChecked :: (ProcessHandle -> IO b) -> CreateProcessRunner +createProcessChecked checker p a = do + t@(_, _, _, pid) <- createProcess p + r <- a t + _ <- checker pid + return r + +{- Leaves the process running, suitable for lazy streaming. + - Note: Zombies will result, and must be waited on. -} +createBackgroundProcess :: CreateProcessRunner +createBackgroundProcess p a = a =<< createProcess p + +{- Runs a CreateProcessRunner, on a CreateProcess structure, that + - is adjusted to pipe only from/to a single StdHandle, and passes + - the resulting Handle to an action. -} +withHandle + :: StdHandle + -> CreateProcessRunner + -> CreateProcess + -> (Handle -> IO a) + -> IO a +withHandle h creator p a = creator p' $ a . select + where + base = p + { std_in = Inherit + , std_out = Inherit + , std_err = Inherit + } + (select, p') + | h == StdinHandle = + (stdinHandle, base { std_in = CreatePipe }) + | h == StdoutHandle = + (stdoutHandle, base { std_out = CreatePipe }) + | h == StderrHandle = + (stderrHandle, base { std_err = CreatePipe }) + +{- Like withHandle, but passes (stdin, stdout) handles to the action. -} +withBothHandles + :: CreateProcessRunner + -> CreateProcess + -> ((Handle, Handle) -> IO a) + -> IO a +withBothHandles creator p a = creator p' $ a . bothHandles + where + p' = p + { std_in = CreatePipe + , std_out = CreatePipe + , std_err = Inherit + } + +{- Extract a desired handle from createProcess's tuple. + - These partial functions are safe as long as createProcess is run + - with appropriate parameters to set up the desired handle. + - Get it wrong and the runtime crash will always happen, so should be + - easily noticed. -} +type HandleExtractor = (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> Handle +stdinHandle :: HandleExtractor +stdinHandle (Just h, _, _, _) = h +stdinHandle _ = error "expected stdinHandle" +stdoutHandle :: HandleExtractor +stdoutHandle (_, Just h, _, _) = h +stdoutHandle _ = error "expected stdoutHandle" +stderrHandle :: HandleExtractor +stderrHandle (_, _, Just h, _) = h +stderrHandle _ = error "expected stderrHandle" +bothHandles :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> (Handle, Handle) +bothHandles (Just hin, Just hout, _, _) = (hin, hout) +bothHandles _ = error "expected bothHandles" + +{- Debugging trace for a CreateProcess. -} +debugProcess :: CreateProcess -> IO () +debugProcess p = do + debugM "Utility.Process" $ unwords + [ action ++ ":" + , showCmd p + , maybe "" show (env p) + ] + where + action + | piped (std_in p) && piped (std_out p) = "chat" + | piped (std_in p) = "feed" + | piped (std_out p) = "read" + | otherwise = "call" + piped Inherit = False + piped _ = True + +{- Shows the command that a CreateProcess will run. -} +showCmd :: CreateProcess -> String +showCmd = go . cmdspec + where + go (ShellCommand s) = s + go (RawCommand c ps) = c ++ " " ++ show ps + +{- Wrappers for System.Process functions that do debug logging. + - + - More could be added, but these are the only ones I usually need. + -} + +createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProcess p = do + debugProcess p + System.Process.createProcess p + +runInteractiveProcess + :: FilePath + -> [String] + -> Maybe FilePath + -> Maybe [(String, String)] + -> IO (Handle, Handle, Handle, ProcessHandle) +runInteractiveProcess f args c e = do + debugProcess $ (proc f args) + { std_in = CreatePipe + , std_out = CreatePipe + , std_err = CreatePipe + } + System.Process.runInteractiveProcess f args c e + +{- I think this is a more descriptive name than System.Process.readProcess. -} +writeReadProcess + :: FilePath + -> [String] + -> String + -> IO String +writeReadProcess f args input = do + debugProcess $ (proc f args) { std_out = CreatePipe, std_in = CreatePipe } + System.Process.readProcess f args input + +{- Normally, when reading from a process, it does not need to be fed any + - input. -} +readProcess + :: FilePath + -> [String] + -> IO String +readProcess f args = do + debugProcess $ (proc f args) { std_out = CreatePipe } + System.Process.readProcess f args [] diff --git a/Utility/SafeCommand.hs b/Utility/SafeCommand.hs index aedf27137..19dd707b8 100644 --- a/Utility/SafeCommand.hs +++ b/Utility/SafeCommand.hs @@ -1,6 +1,6 @@ {- safely running shell commands - - - Copyright 2010-2011 Joey Hess <joey@kitenet.net> + - Copyright 2010-2012 Joey Hess <joey@kitenet.net> - - Licensed under the GNU GPL version 3 or higher. -} @@ -8,11 +8,9 @@ module Utility.SafeCommand where import System.Exit -import qualified System.Posix.Process -import System.Posix.Process hiding (executeFile) -import System.Posix.Signals +import Utility.Process +import System.Process (env) import Data.String.Utils -import System.Log.Logger import Control.Applicative {- A type for parameters passed to a shell command. A command can @@ -42,7 +40,7 @@ boolSystem :: FilePath -> [CommandParam] -> IO Bool boolSystem command params = boolSystemEnv command params Nothing boolSystemEnv :: FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool -boolSystemEnv command params env = dispatch <$> safeSystemEnv command params env +boolSystemEnv command params environ = dispatch <$> safeSystemEnv command params environ where dispatch ExitSuccess = True dispatch _ = False @@ -51,36 +49,13 @@ boolSystemEnv command params env = dispatch <$> safeSystemEnv command params env safeSystem :: FilePath -> [CommandParam] -> IO ExitCode safeSystem command params = safeSystemEnv command params Nothing -{- SIGINT(ctrl-c) is allowed to propigate and will terminate the program. -} +{- Unlike many implementations of system, SIGINT(ctrl-c) is allowed + - to propigate and will terminate the program. -} safeSystemEnv :: FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO ExitCode -safeSystemEnv command params env = do - -- Going low-level because all the high-level system functions - -- block SIGINT etc. We need to block SIGCHLD, but allow - -- SIGINT to do its default program termination. - let sigset = addSignal sigCHLD emptySignalSet - oldint <- installHandler sigINT Default Nothing - oldset <- getSignalMask - blockSignals sigset - childpid <- forkProcess $ childaction oldint oldset - mps <- getProcessStatus True False childpid - restoresignals oldint oldset - case mps of - Just (Exited code) -> return code - _ -> error $ "unknown error running " ++ command - where - restoresignals oldint oldset = do - _ <- installHandler sigINT oldint Nothing - setSignalMask oldset - childaction oldint oldset = do - restoresignals oldint oldset - executeFile command True (toCommand params) env - -{- executeFile with debug logging -} -executeFile :: FilePath -> Bool -> [String] -> Maybe [(String, String)] -> IO () -executeFile c path p e = do - debugM "Utility.SafeCommand.executeFile" $ - "Running: " ++ c ++ " " ++ show p ++ " " ++ maybe "" show e - System.Posix.Process.executeFile c path p e +safeSystemEnv command params environ = do + (_, _, _, pid) <- createProcess (proc command $ toCommand params) + { env = environ } + waitForProcess pid {- Escapes a filename or other parameter to be safely able to be exposed to - the shell. -} diff --git a/Utility/TSet.hs b/Utility/TSet.hs new file mode 100644 index 000000000..24d345477 --- /dev/null +++ b/Utility/TSet.hs @@ -0,0 +1,39 @@ +{- Transactional sets + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Utility.TSet where + +import Common + +import Control.Concurrent.STM + +type TSet = TChan + +runTSet :: STM a -> IO a +runTSet = atomically + +newTSet :: IO (TSet a) +newTSet = atomically newTChan + +{- Gets the contents of the TSet. Blocks until at least one item is + - present. -} +getTSet :: TSet a -> IO [a] +getTSet tset = runTSet $ do + c <- readTChan tset + go [c] + where + go l = do + v <- tryReadTChan tset + case v of + Nothing -> return l + Just c -> go (c:l) + +{- Puts items into a TSet. -} +putTSet :: TSet a -> [a] -> IO () +putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs + +{- Put a single item into a TSet. -} +putTSet1 :: TSet a -> a -> IO () +putTSet1 tset v = void $ runTSet $ writeTChan tset v diff --git a/Utility/TempFile.hs b/Utility/TempFile.hs index 4dcbf1cca..62e0fc859 100644 --- a/Utility/TempFile.hs +++ b/Utility/TempFile.hs @@ -9,7 +9,7 @@ module Utility.TempFile where import Control.Exception (bracket) import System.IO -import System.Posix.Process hiding (executeFile) +import System.Posix.Process import System.Directory import Utility.Exception diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs index c828a0593..ba7eae6a1 100644 --- a/Utility/Types/DirWatcher.hs +++ b/Utility/Types/DirWatcher.hs @@ -20,3 +20,6 @@ data WatchHooks = WatchHooks , delDirHook :: Hook FilePath , errHook :: Hook String -- error message } + +mkWatchHooks :: WatchHooks +mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn index 9b3e3b08e..12dc0aa3e 100644 --- a/doc/design/assistant/syncing.mdwn +++ b/doc/design/assistant/syncing.mdwn @@ -35,6 +35,12 @@ all the other git clones, at both the git level and the key/value level. (will need to use `GIT_SSH`, which needs to point to a command to run, not a shell command line) +## misc todo + +* --debug will show often unnecessary work being done. Optimise. +* It would be nice if, when a USB drive is connected, + syncing starts automatically. Use dbus on Linux? + ## data syncing There are two parts to data syncing. First, map the network and second, diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn index c52a5f3bf..85a5a18f0 100644 --- a/doc/git-annex.mdwn +++ b/doc/git-annex.mdwn @@ -185,6 +185,10 @@ subdirectories). To not daemonize, run with --foreground ; to stop a running daemon, run with --stop +* assistant + + Like watch, but also automatically syncs changes to other remotes. + # REPOSITORY SETUP COMMANDS * init [description] diff --git a/doc/todo/assistant_threaded_runtime.mdwn b/doc/todo/assistant_threaded_runtime.mdwn index 3953cf062..03ba66acf 100644 --- a/doc/todo/assistant_threaded_runtime.mdwn +++ b/doc/todo/assistant_threaded_runtime.mdwn @@ -28,6 +28,9 @@ git-annex does not otherwise use threads, so this is surprising. --[[Joey]] > I've spent a lot of time debugging this, and trying to fix it, in the > "threaded" branch. There are still deadlocks. --[[Joey]] +>> Fixed, by switching from `System.Cmd.Utils` to `System.Process` +>> --[[Joey]] + --- It would be possible to not use the threaded runtime. Instead, we could diff --git a/git-annex.cabal b/git-annex.cabal index 0bd35e14f..e58bd4d95 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -1,5 +1,5 @@ Name: git-annex -Version: 3.20120629 +Version: 3.20120630 Cabal-Version: >= 1.8 License: GPL Maintainer: Joey Hess <joey@kitenet.net> @@ -40,11 +40,12 @@ Executable git-annex unix, containers, utf8-string, network, mtl, bytestring, old-locale, time, pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP, base == 4.5.*, monad-control, transformers-base, lifted-base, - IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance + IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process -- Need to list this because it's generated from a .hsc file. Other-Modules: Utility.Touch C-Sources: Utility/libdiskfree.c Extensions: CPP + GHC-Options: -threaded if flag(S3) Build-Depends: hS3 @@ -65,10 +66,11 @@ Test-Suite test unix, containers, utf8-string, network, mtl, bytestring, old-locale, time, pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP, base == 4.5.*, monad-control, transformers-base, lifted-base, - IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance + IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process Other-Modules: Utility.Touch C-Sources: Utility/libdiskfree.c Extensions: CPP + GHC-Options: -threaded source-repository head type: git @@ -14,6 +14,7 @@ import Test.QuickCheck import System.Posix.Directory (changeWorkingDirectory) import System.Posix.Files import System.Posix.Env +import System.Posix.Process import Control.Exception.Extensible import qualified Data.Map as M import System.IO.HVFS (SystemFS(..)) |