summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Annex/UUID.hs4
-rw-r--r--Assistant.hs96
-rw-r--r--Assistant/Changes.hs27
-rw-r--r--Assistant/Commits.hs34
-rw-r--r--Assistant/DaemonStatus.hs41
-rw-r--r--Assistant/Pushes.hs46
-rw-r--r--Assistant/ThreadedMonad.hs29
-rw-r--r--Assistant/Threads/Committer.hs (renamed from Assistant/Committer.hs)36
-rw-r--r--Assistant/Threads/Merger.hs87
-rw-r--r--Assistant/Threads/Pusher.hs90
-rw-r--r--Assistant/Threads/SanityChecker.hs (renamed from Assistant/SanityChecker.hs)25
-rw-r--r--Assistant/Threads/TransferWatcher.hs66
-rw-r--r--Assistant/Threads/Transferrer.hs104
-rw-r--r--Assistant/Threads/Watcher.hs (renamed from Assistant/Watcher.hs)47
-rw-r--r--Assistant/TransferQueue.hs75
-rw-r--r--Assistant/TransferSlots.hs40
-rw-r--r--Backend/SHA.hs16
-rw-r--r--Build/Configure.hs4
-rw-r--r--Command/Assistant.hs18
-rw-r--r--Command/Fsck.hs2
-rw-r--r--Command/Map.hs4
-rw-r--r--Command/Sync.hs43
-rw-r--r--Command/Watch.hs18
-rw-r--r--Common.hs3
-rw-r--r--Config.hs4
-rw-r--r--Git/Branch.hs8
-rw-r--r--Git/Command.hs40
-rw-r--r--Git/Config.hs11
-rw-r--r--Git/HashObject.hs8
-rw-r--r--Git/Queue.hs12
-rw-r--r--Git/UpdateIndex.hs14
-rw-r--r--GitAnnex.hs2
-rw-r--r--Makefile2
-rw-r--r--Remote/Bup.hs10
-rw-r--r--Remote/Git.hs7
-rw-r--r--Remote/Hook.hs17
-rw-r--r--Remote/Rsync.hs1
-rw-r--r--Seek.hs4
-rw-r--r--Utility/CoProcess.hs14
-rw-r--r--Utility/DirWatcher.hs36
-rw-r--r--Utility/Gpg.hs38
-rw-r--r--Utility/INotify.hs7
-rw-r--r--Utility/Kqueue.hs62
-rw-r--r--Utility/Lsof.hs10
-rw-r--r--Utility/Misc.hs2
-rw-r--r--Utility/Parallel.hs35
-rw-r--r--Utility/Process.hs214
-rw-r--r--Utility/SafeCommand.hs45
-rw-r--r--Utility/TSet.hs39
-rw-r--r--Utility/TempFile.hs2
-rw-r--r--Utility/Types/DirWatcher.hs3
-rw-r--r--doc/design/assistant/syncing.mdwn6
-rw-r--r--doc/git-annex.mdwn4
-rw-r--r--doc/todo/assistant_threaded_runtime.mdwn3
-rw-r--r--git-annex.cabal8
-rw-r--r--test.hs1
56 files changed, 1301 insertions, 323 deletions
diff --git a/Annex/UUID.hs b/Annex/UUID.hs
index 517840fba..09862f9fc 100644
--- a/Annex/UUID.hs
+++ b/Annex/UUID.hs
@@ -32,8 +32,10 @@ configkey = annexConfig "uuid"
{- Generates a UUID. There is a library for this, but it's not packaged,
- so use the command line tool. -}
genUUID :: IO UUID
-genUUID = pOpen ReadFromPipe command params $ liftM toUUID . hGetLine
+genUUID = gen . lines <$> readProcess command params
where
+ gen [] = error $ "no output from " ++ command
+ gen (l:_) = toUUID l
command = SysConfig.uuid
params
-- request a random uuid be generated
diff --git a/Assistant.hs b/Assistant.hs
index e924d9477..06484b086 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -21,15 +21,35 @@
- until this is complete.
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- - index, then commits.
- - Thread 6: status logger
+ - index, then commits. Also queues Transfer events to send added
+ - files to other remotes.
+ - Thread 6: pusher
+ - Waits for commits to be made, and pushes updated branches to remotes,
+ - in parallel. (Forks a process for each git push.)
+ - Thread 7: push retryer
+ - Runs every 30 minutes when there are failed pushes, and retries
+ - them.
+ - Thread 8: merger
+ - Waits for pushes to be received from remotes, and merges the
+ - updated branches into the current branch.
+ - (This uses inotify on .git/refs/heads, so there are additional
+ - inotify threads associated with it, too.)
+ - Thread 9: transfer watcher
+ - Watches for transfer information files being created and removed,
+ - and maintains the DaemonStatus currentTransfers map and the
+ - TransferSlots QSemN.
+ - (This uses inotify on .git/annex/transfer/, so there are
+ - additional inotify threads associated with it, too.)
+ - Thread 10: transferrer
+ - Waits for Transfers to be queued and does them.
+ - Thread 11: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 7: sanity checker
+ - Thread 12: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
- The Annex state is stored here, which allows resuscitating the
- - Annex monad in IO actions run by the inotify and committer
+ - Annex monad in IO actions run by the watcher and committer
- threads. Thus, a single state is shared amoung the threads, and
- only one at a time can access it.
- DaemonStatusHandle: (MVar)
@@ -39,6 +59,20 @@
- ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer
- reads from it.
+ - CommitChan: (STM TChan)
+ - Commits are indicated by writing to this channel. The pusher reads
+ - from it.
+ - FailedPushMap (STM TMVar)
+ - Failed pushes are indicated by writing to this TMVar. The push
+ - retrier blocks until they're available.
+ - TransferQueue (STM TChan)
+ - Transfers to make are indicated by writing to this channel.
+ - TransferSlots (QSemN)
+ - Count of the number of currently available transfer slots.
+ - Updated by the transfer watcher, this allows other threads
+ - to block until a slot is available.
+ - This MVar should only be manipulated from inside the Annex monad,
+ - which ensures it's accessed only after the ThreadState MVar.
-}
module Assistant where
@@ -47,39 +81,55 @@ import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
-import Assistant.Watcher
-import Assistant.Committer
-import Assistant.SanityChecker
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Assistant.Threads.Watcher
+import Assistant.Threads.Committer
+import Assistant.Threads.Pusher
+import Assistant.Threads.Merger
+import Assistant.Threads.TransferWatcher
+import Assistant.Threads.Transferrer
+import Assistant.Threads.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
+import Utility.ThreadScheduler
import Control.Concurrent
-startDaemon :: Bool -> Annex ()
-startDaemon foreground
+startDaemon :: Bool -> Bool -> Annex ()
+startDaemon assistant foreground
| foreground = do
- showStart "watch" "."
+ showStart (if assistant then "assistant" else "watch") "."
go id
| otherwise = do
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
pidfile <- fromRepo gitAnnexPidFile
go $ Utility.Daemon.daemonize logfd (Just pidfile) False
where
- go a = withThreadState $ \st -> do
+ go daemonize = withThreadState $ \st -> do
checkCanWatch
dstatus <- startDaemonStatus
- liftIO $ a $ do
- changechan <- newChangeChan
- -- The commit thread is started early,
- -- so that the user can immediately
- -- begin adding files and having them
- -- committed, even while the startup scan
- -- is taking place.
- _ <- forkIO $ commitThread st changechan
- _ <- forkIO $ daemonStatusThread st dstatus
- _ <- forkIO $ sanityCheckerThread st dstatus changechan
- -- Does not return.
- watchThread st dstatus changechan
+ liftIO $ daemonize $ run dstatus st
+ run dstatus st = do
+ changechan <- newChangeChan
+ commitchan <- newCommitChan
+ pushmap <- newFailedPushMap
+ transferqueue <- newTransferQueue
+ transferslots <- newTransferSlots
+ mapM_ (void . forkIO)
+ [ commitThread st changechan commitchan transferqueue dstatus
+ , pushThread st dstatus commitchan pushmap
+ , pushRetryThread st pushmap
+ , mergeThread st
+ , transferWatcherThread st dstatus
+ , transfererThread st dstatus transferqueue transferslots
+ , daemonStatusThread st dstatus
+ , sanityCheckerThread st dstatus transferqueue changechan
+ , watchThread st dstatus transferqueue changechan
+ ]
+ waitForTermination
stopDaemon :: Annex ()
stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 173ba1922..eca922109 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -1,6 +1,8 @@
{- git-annex assistant change tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Changes where
@@ -8,14 +10,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
+import Utility.TSet
-import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
-type ChangeChan = TChan Change
+type ChangeChan = TSet Change
data Change
= Change
@@ -29,11 +31,8 @@ data Change
}
deriving (Show)
-runChangeChan :: STM a -> IO a
-runChangeChan = atomically
-
newChangeChan :: IO ChangeChan
-newChangeChan = atomically newTChan
+newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@@ -65,17 +64,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = runChangeChan $ do
- c <- readTChan chan
- go [c]
- where
- go l = do
- v <- tryReadTChan chan
- case v of
- Nothing -> return l
- Just c -> go (c:l)
+getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+refillChanges = putTSet
+
+{- Records a change in the channel. -}
+recordChange :: ChangeChan -> Change -> IO ()
+recordChange = putTSet1
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
new file mode 100644
index 000000000..86fd7599f
--- /dev/null
+++ b/Assistant/Commits.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant commit tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Commits where
+
+import Utility.TSet
+
+import Data.Time.Clock
+
+type CommitChan = TSet Commit
+
+data Commit = Commit UTCTime
+ deriving (Show)
+
+newCommitChan :: IO CommitChan
+newCommitChan = newTSet
+
+{- Gets all unhandled commits.
+ - Blocks until at least one commit is made. -}
+getCommits :: CommitChan -> IO [Commit]
+getCommits = getTSet
+
+{- Puts unhandled commits back into the channel.
+ - Note: Original order is not preserved. -}
+refillCommits :: CommitChan -> [Commit] -> IO ()
+refillCommits = putTSet
+
+{- Records a commit in the channel. -}
+recordCommit :: CommitChan -> Commit -> IO ()
+recordCommit = putTSet1
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index e5ba3d151..64c441cee 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -1,6 +1,8 @@
{- git-annex assistant daemon status
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.DaemonStatus where
@@ -9,12 +11,15 @@ import Common.Annex
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.TempFile
+import Logs.Transfer
+import qualified Command.Sync
import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
+import qualified Data.Map as M
data DaemonStatus = DaemonStatus
-- False when the daemon is performing its startup scan
@@ -25,9 +30,15 @@ data DaemonStatus = DaemonStatus
, sanityCheckRunning :: Bool
-- Last time the sanity checker ran
, lastSanityCheck :: Maybe POSIXTime
+ -- Currently running file content transfers
+ , currentTransfers :: TransferMap
+ -- Ordered list of remotes to talk to.
+ , knownRemotes :: [Remote]
}
deriving (Show)
+type TransferMap = M.Map Transfer TransferInfo
+
type DaemonStatusHandle = MVar DaemonStatus
newDaemonStatus :: DaemonStatus
@@ -36,24 +47,33 @@ newDaemonStatus = DaemonStatus
, lastRunning = Nothing
, sanityCheckRunning = False
, lastSanityCheck = Nothing
+ , currentTransfers = M.empty
+ , knownRemotes = []
}
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
getDaemonStatus = liftIO . readMVar
-modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
-modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a)
+modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
+modifyDaemonStatus_ handle a = liftIO $ modifyMVar_ handle (return . a)
+
+modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> Annex b
+modifyDaemonStatus handle a = liftIO $ modifyMVar handle (return . a)
{- Load any previous daemon status file, and store it in the MVar for this
- - process to use as its DaemonStatus. -}
+ - process to use as its DaemonStatus. Also gets current transfer status. -}
startDaemonStatus :: Annex DaemonStatusHandle
startDaemonStatus = do
file <- fromRepo gitAnnexDaemonStatusFile
status <- liftIO $
catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
+ transfers <- M.fromList <$> getTransfers
+ remotes <- Command.Sync.syncRemotes []
liftIO $ newMVar status
{ scanComplete = False
, sanityCheckRunning = False
+ , currentTransfers = transfers
+ , knownRemotes = remotes
}
{- This thread wakes up periodically and writes the daemon status to disk. -}
@@ -117,3 +137,18 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int
tenMinutes = 10 * 60
+
+{- Mutates the transfer map. -}
+adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex ()
+adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
+ \s -> s { currentTransfers = a (currentTransfers s) }
+
+{- Removes a transfer from the map, and returns its info. -}
+removeTransfer :: DaemonStatusHandle -> Transfer -> Annex (Maybe TransferInfo)
+removeTransfer dstatus t = modifyDaemonStatus dstatus go
+ where
+ go s =
+ let (info, ts) = M.updateLookupWithKey
+ (\_k _v -> Nothing)
+ t (currentTransfers s)
+ in (s { currentTransfers = ts }, info)
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
new file mode 100644
index 000000000..f411dda07
--- /dev/null
+++ b/Assistant/Pushes.hs
@@ -0,0 +1,46 @@
+{- git-annex assistant push tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Pushes where
+
+import Common.Annex
+
+import Control.Concurrent.STM
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- Track the most recent push failure for each remote. -}
+type PushMap = M.Map Remote UTCTime
+type FailedPushMap = TMVar PushMap
+
+{- The TMVar starts empty, and is left empty when there are no
+ - failed pushes. This way we can block until there are some failed pushes.
+ -}
+newFailedPushMap :: IO FailedPushMap
+newFailedPushMap = atomically newEmptyTMVar
+
+{- Blocks until there are failed pushes.
+ - Returns Remotes whose pushes failed a given time duration or more ago.
+ - (This may be an empty list.) -}
+getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote]
+getFailedPushesBefore v duration = do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
+ where
+ toorecent now time = now `diffUTCTime` time < duration
+
+{- Modifies the map. -}
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO ()
+changeFailedPushMap v a = atomically $
+ store . a . fromMaybe M.empty =<< tryTakeTMVar v
+ where
+ {- tryTakeTMVar empties the TMVar; refill it only if
+ - the modified map is not itself empty -}
+ store m
+ | m == M.empty = noop
+ | otherwise = putTMVar v $! m
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
index 51f579d07..1decd8e91 100644
--- a/Assistant/ThreadedMonad.hs
+++ b/Assistant/ThreadedMonad.hs
@@ -1,17 +1,17 @@
{- making the Annex monad available across threads
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE BangPatterns #-}
-
module Assistant.ThreadedMonad where
import Common.Annex
import qualified Annex
import Control.Concurrent
-import Control.Exception (throw)
+import Data.Tuple
{- The Annex state is stored in a MVar, so that threaded actions can access
- it. -}
@@ -32,13 +32,18 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- - This serializes calls by threads. -}
+ - This serializes calls by threads; only one thread can run in Annex at a
+ - time. -}
runThreadState :: ThreadState -> Annex a -> IO a
-runThreadState mvar a = do
- startstate <- takeMVar mvar
- -- catch IO errors and rethrow after restoring the MVar
- !(r, newstate) <- catchIO (Annex.run startstate a) $ \e -> do
- putMVar mvar startstate
- throw e
- putMVar mvar newstate
- return r
+runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
+
+{- Runs an Annex action, using a copy of the state from the MVar.
+ -
+ - It's up to the action to perform any necessary shutdown tasks in order
+ - for state to not be lost. And it's up to the caller to resynchronise
+ - with any changes the action makes to eg, the git-annex branch.
+ -}
+unsafeRunThreadState :: ThreadState -> Annex a -> IO ()
+unsafeRunThreadState mvar a = do
+ state <- readMVar mvar
+ void $ Annex.eval state a
diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs
index 63df8cafc..ff5cc9eab 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -1,14 +1,20 @@
{- git-annex assistant commit thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.Committer where
+module Assistant.Threads.Committer where
import Common.Annex
import Assistant.Changes
+import Assistant.Commits
import Assistant.ThreadedMonad
-import Assistant.Watcher
+import Assistant.Threads.Watcher
+import Assistant.TransferQueue
+import Assistant.DaemonStatus
+import Logs.Transfer
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -26,8 +32,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> IO ()
-commitThread st changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
+commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -36,10 +42,11 @@ commitThread st changechan = runEvery (Seconds 1) $ do
time <- getCurrentTime
if shouldCommit time changes
then do
- readychanges <- handleAdds st changechan changes
+ readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
+ recordCommit commitchan (Commit time)
else refillChanges changechan readychanges
else refillChanges changechan changes
@@ -93,8 +100,8 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
-handleAdds st changechan cs = returnWhen (null pendingadds) $ do
+handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
@@ -106,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
- r <- handleAdds st changechan
+ r <- handleAdds st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
@@ -117,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
| otherwise = a
add :: Change -> IO (Maybe Change)
- add change@(PendingAddChange { keySource = ks }) = do
- r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
- showStart "add" $ keyFilename ks
- handle (finishedChange change) (keyFilename ks)
- =<< Command.Add.ingest ks
- return $ maybeMaybe r
+ add change@(PendingAddChange { keySource = ks }) =
+ liftM maybeMaybe $ catchMaybeIO $
+ sanitycheck ks $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ key <- Command.Add.ingest ks
+ handle (finishedChange change) (keyFilename ks) key
add _ = return Nothing
maybeMaybe (Just j@(Just _)) = j
@@ -137,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
+ queueTransfers transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
new file mode 100644
index 000000000..c7da86a8d
--- /dev/null
+++ b/Assistant/Threads/Merger.hs
@@ -0,0 +1,87 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Merger where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Annex.Branch
+import qualified Git
+import qualified Git.Command
+import qualified Git.Merge
+import qualified Git.Branch
+import qualified Command.Sync
+import qualified Remote
+
+{- This thread watches for changes to .git/refs/heads/synced/*,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let changedbranch = Git.Ref $
+ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ when (Just changedbranch == current) $
+ void $ mergeBranch changedbranch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
+
+{- Manually pull from remotes and merge their branches. Called by the pusher
+ - when a push fails, which can happen due to a remote not having pushed
+ - changes to us. That could be because it doesn't have us as a remote, or
+ - because the assistant is not running there, or other reasons. -}
+manualPull :: Git.Ref -> [Remote] -> Annex ()
+manualPull currentbranch remotes = do
+ forM_ remotes $ \r ->
+ inRepo $ Git.Command.runBool "fetch" [Param $ Remote.name r]
+ Annex.Branch.forceUpdate
+ forM_ remotes $ \r ->
+ Command.Sync.mergeRemote r currentbranch
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
new file mode 100644
index 000000000..6d6836120
--- /dev/null
+++ b/Assistant/Threads/Pusher.hs
@@ -0,0 +1,90 @@
+{- git-annex assistant git pushing threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Pusher where
+
+import Common.Annex
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.DaemonStatus
+import Assistant.ThreadedMonad
+import Assistant.Threads.Merger
+import qualified Command.Sync
+import Utility.ThreadScheduler
+import Utility.Parallel
+
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> FailedPushMap -> IO ()
+pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ unless (null topush) $ do
+ now <- getCurrentTime
+ pushToRemotes now st pushmap topush
+ where
+ halfhour = 1800
+
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
+pushThread st daemonstatus commitchan pushmap = do
+ runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to push.
+ now <- getCurrentTime
+ if shouldPush now commits
+ then do
+ remotes <- runThreadState st $
+ knownRemotes <$> getDaemonStatus daemonstatus
+ pushToRemotes now st pushmap remotes
+ else refillCommits commitchan commits
+
+{- Decide if now is a good time to push to remotes.
+ -
+ - Current strategy: Immediately push all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -}
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
+ | not (null commits) = True
+ | otherwise = False
+
+{- Updates the local sync branch, then pushes it to all remotes, in
+ - parallel.
+ -
+ - Avoids running possibly long-duration commands in the Annex monad, so
+ - as not to block other threads. -}
+pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
+pushToRemotes now st pushmap remotes = do
+ (g, branch) <- runThreadState st $
+ (,) <$> fromRepo id <*> Command.Sync.currentBranch
+ go True branch g remotes
+ where
+ go shouldretry branch g rs = do
+ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
+ (succeeded, failed) <- inParallel (push g branch) rs
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
+ unless (null failed || not shouldretry) $
+ retry branch g failed
+
+ makemap l = M.fromList $ zip l (repeat now)
+
+ push g branch remote =
+ ifM (Command.Sync.pushBranch remote branch g)
+ ( exitSuccess, exitFailure)
+
+ retry branch g rs = do
+ runThreadState st $ manualPull branch rs
+ go False branch g rs
diff --git a/Assistant/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index e2ca9da74..c5b99863e 100644
--- a/Assistant/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -1,9 +1,11 @@
{- git-annex assistant sanity checker
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.SanityChecker (
+module Assistant.Threads.SanityChecker (
sanityCheckerThread
) where
@@ -12,26 +14,27 @@ import qualified Git.LsFiles
import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Changes
+import Assistant.TransferQueue
import Utility.ThreadScheduler
-import qualified Assistant.Watcher
+import qualified Assistant.Threads.Watcher as Watcher
import Data.Time.Clock.POSIX
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
-sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-sanityCheckerThread st status changechan = forever $ do
+sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+sanityCheckerThread st status transferqueue changechan = forever $ do
waitForNextCheck st status
runThreadState st $
- modifyDaemonStatus status $ \s -> s
+ modifyDaemonStatus_ status $ \s -> s
{ sanityCheckRunning = True }
now <- getPOSIXTime -- before check started
- catchIO (check st status changechan)
+ catchIO (check st status transferqueue changechan)
(runThreadState st . warning . show)
runThreadState st $ do
- modifyDaemonStatus status $ \s -> s
+ modifyDaemonStatus_ status $ \s -> s
{ sanityCheckRunning = False
, lastSanityCheck = Just now
}
@@ -56,8 +59,8 @@ oneDay = 24 * 60 * 60
{- It's important to stay out of the Annex monad as much as possible while
- running potentially expensive parts of this check, since remaining in it
- will block the watcher. -}
-check :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-check st status changechan = do
+check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+check st status transferqueue changechan = do
g <- runThreadState st $ do
showSideAction "Running daily check"
fromRepo id
@@ -77,5 +80,5 @@ check st status changechan = do
insanity m = runThreadState st $ warning m
addsymlink file s = do
insanity $ "found unstaged symlink: " ++ file
- Assistant.Watcher.runHandler st status changechan
- Assistant.Watcher.onAddSymlink file s
+ Watcher.runHandler st status transferqueue changechan
+ Watcher.onAddSymlink file s
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
new file mode 100644
index 000000000..364ce0468
--- /dev/null
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -0,0 +1,66 @@
+{- git-annex assistant transfer watching thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferWatcher where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+
+import Data.Map as M
+
+{- This thread watches for changes to the gitAnnexTransferDir,
+ - and updates the DaemonStatus's map of ongoing transfers. -}
+transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
+transferWatcherThread st dstatus = do
+ g <- runThreadState st $ fromRepo id
+ let dir = gitAnnexTransferDir g
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler st dstatus a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+
+type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler st dstatus file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ _ msg _ = error msg
+
+{- Called when a new transfer information file is written. -}
+onAdd :: Handler
+onAdd st dstatus file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> runThreadState st $ go t =<< checkTransfer t
+ where
+ go _ Nothing = noop -- transfer already finished
+ go t (Just info) = adjustTransfers dstatus $
+ M.insertWith' merge t info
+ -- preseve transferTid, which is not written to disk
+ merge new old = new { transferTid = transferTid old }
+
+{- Called when a transfer information file is removed. -}
+onDel :: Handler
+onDel st dstatus file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> void $ runThreadState st $ removeTransfer dstatus t
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
new file mode 100644
index 000000000..c439d8b7e
--- /dev/null
+++ b/Assistant/Threads/Transferrer.hs
@@ -0,0 +1,104 @@
+{- git-annex assistant data transferrer thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Transferrer where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Logs.Transfer
+import Logs.Presence
+import Logs.Location
+import Annex.Content
+import qualified Remote
+
+import Data.Time.Clock.POSIX
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- For now only one transfer is run at a time. -}
+maxTransfers :: Int
+maxTransfers = 1
+
+{- Dispatches transfers from the queue. -}
+transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
+transfererThread st dstatus transferqueue slots = go
+ where
+ go = do
+ (t, info) <- getNextTransfer transferqueue
+ whenM (runThreadState st $ shouldTransfer dstatus t info) $
+ runTransfer st dstatus slots t info
+ go
+
+{- Checks if the requested transfer is already running, or
+ - the file to download is already present, or the remote
+ - being uploaded to isn't known to have the file. -}
+shouldTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> Annex Bool
+shouldTransfer dstatus t info =
+ go =<< currentTransfers <$> getDaemonStatus dstatus
+ where
+ go m
+ | M.member t m = return False
+ | transferDirection t == Download =
+ not <$> inAnnex key
+ | transferDirection t == Upload =
+ {- Trust the location log to check if the
+ - remote already has the key. This avoids
+ - a roundtrip to the remote. -}
+ case transferRemote info of
+ Nothing -> return False
+ Just remote ->
+ notElem (Remote.uuid remote)
+ <$> loggedLocations key
+ | otherwise = return False
+ key = transferKey t
+
+{- A transfer is run in a separate thread, with a *copy* of the Annex
+ - state. This is necessary to avoid blocking the rest of the assistant
+ - on the transfer completing, and also to allow multiple transfers to run
+ - at once. This requires GHC's threaded runtime to work!
+ -
+ - The copy of state means that the transfer processes are responsible
+ - for doing any necessary shutdown cleanups, and that the parent
+ - thread's cache must be invalidated once a transfer completes, as
+ - changes may have been made to the git-annex branch.
+ -}
+runTransfer :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> IO ()
+runTransfer st dstatus slots t info = case (transferRemote info, associatedFile info) of
+ (Nothing, _) -> noop
+ (_, Nothing) -> noop
+ (Just remote, Just file) -> do
+ tid <- inTransferSlot slots st $
+ transferprocess remote file
+ now <- getCurrentTime
+ runThreadState st $ adjustTransfers dstatus $
+ M.insertWith' const t info
+ { startedTime = Just $ utcTimeToPOSIXSeconds now
+ , transferTid = Just tid
+ }
+ where
+ isdownload = transferDirection t == Download
+ tofrom
+ | isdownload = "from"
+ | otherwise = "to"
+ key = transferKey t
+
+ transferprocess remote file = do
+ showStart "copy" file
+ showAction $ tofrom ++ " " ++ Remote.name remote
+ ok <- transfer t (Just file) $
+ if isdownload
+ then getViaTmp key $
+ Remote.retrieveKeyFile remote key (Just file)
+ else do
+ ok <- Remote.storeKey remote key $ Just file
+ when ok $
+ Remote.logStatus remote key InfoPresent
+ return ok
+ showEndResult ok
diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs
index db58f01e8..9f0eba74e 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -7,12 +7,14 @@
{-# LANGUAGE CPP #-}
-module Assistant.Watcher where
+module Assistant.Threads.Watcher where
import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
+import Assistant.TransferQueue
+import Logs.Transfer
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Annex
@@ -27,7 +29,6 @@ import Annex.Content
import Annex.CatFile
import Git.Types
-import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@@ -46,11 +47,11 @@ needLsof = error $ unlines
, "Be warned: This can corrupt data in the annex, and make fsck complain."
]
-watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-watchThread st dstatus changechan = watchDir "." ignored hooks startup
+watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+watchThread st dstatus transferqueue changechan = void $ watchDir "." ignored hooks startup
where
startup = statupScan st dstatus
- hook a = Just $ runHandler st dstatus changechan a
+ hook a = Just $ runHandler st dstatus transferqueue changechan a
hooks = WatchHooks
{ addHook = hook onAdd
, delHook = hook onDel
@@ -66,7 +67,7 @@ statupScan st dstatus scanner = do
showAction "scanning"
r <- scanner
runThreadState st $
- modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
+ modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
-- Notice any files that were deleted before watching was started.
runThreadState st $ do
@@ -83,23 +84,22 @@ ignored = ig . takeFileName
ig ".gitattributes" = True
ig _ = False
-type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
+type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
{- Runs an action handler, inside the Annex monad, and if there was a
- change, adds it to the ChangeChan.
-
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-}
-runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler st dstatus changechan handler file filestatus = void $ do
+runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus transferqueue changechan handler file filestatus = void $ do
r <- tryIO go
case r of
Left e -> print e
Right Nothing -> noop
- Right (Just change) -> void $
- runChangeChan $ writeTChan changechan change
+ Right (Just change) -> recordChange changechan change
where
- go = runThreadState st $ handler file filestatus dstatus
+ go = runThreadState st $ handler file filestatus dstatus transferqueue
{- During initial directory scan, this will be run for any regular files
- that are already checked into git. We don't want to turn those into
@@ -120,7 +120,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do
- the add.
-}
onAdd :: Handler
-onAdd file filestatus dstatus
+onAdd file filestatus dstatus _
| maybe False isRegularFile filestatus = do
ifM (scanComplete <$> getDaemonStatus dstatus)
( go
@@ -138,12 +138,15 @@ onAdd file filestatus dstatus
- before adding it.
-}
onAddSymlink :: Handler
-onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
+onAddSymlink file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
where
go (Just (key, _)) = do
link <- calcGitLink file key
ifM ((==) link <$> liftIO (readSymbolicLink file))
- ( ensurestaged link =<< getDaemonStatus dstatus
+ ( do
+ s <- getDaemonStatus dstatus
+ checkcontent key s
+ ensurestaged link s
, do
liftIO $ removeFile file
liftIO $ createSymbolicLink link file
@@ -185,8 +188,16 @@ onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
stageSymlink file sha
madeChange file LinkChange
+ {- When a new link appears, after the startup scan,
+ - try to get the key's content. -}
+ checkcontent key daemonstatus
+ | scanComplete daemonstatus = unlessM (inAnnex key) $
+ queueTransfers transferqueue dstatus
+ key (Just file) Download
+ | otherwise = noop
+
onDel :: Handler
-onDel file _ _dstatus = do
+onDel file _ _dstatus _ = do
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.unstageFile file)
madeChange file RmChange
@@ -199,14 +210,14 @@ onDel file _ _dstatus = do
- command to get the recursive list of files in the directory, so rm is
- just as good. -}
onDelDir :: Handler
-onDelDir dir _ _dstatus = do
+onDelDir dir _ _dstatus _ = do
Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
madeChange dir RmDirChange
{- Called when there's an error with inotify. -}
onErr :: Handler
-onErr msg _ _dstatus = do
+onErr msg _ _dstatus _ = do
warning msg
return Nothing
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
new file mode 100644
index 000000000..b0eca96c8
--- /dev/null
+++ b/Assistant/TransferQueue.hs
@@ -0,0 +1,75 @@
+{- git-annex assistant pending transfer queue
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferQueue where
+
+import Common.Annex
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Types.Remote
+import qualified Remote
+
+import Control.Concurrent.STM
+
+type TransferQueue = TChan (Transfer, TransferInfo)
+
+newTransferQueue :: IO TransferQueue
+newTransferQueue = atomically newTChan
+
+stubInfo :: AssociatedFile -> TransferInfo
+stubInfo f = TransferInfo
+ { startedTime = Nothing
+ , transferPid = Nothing
+ , transferTid = Nothing
+ , transferRemote = Nothing
+ , bytesComplete = Nothing
+ , associatedFile = f
+ }
+
+{- Adds pending transfers to the end of the queue for some of the known
+ - remotes. -}
+queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers q daemonstatus k f direction = do
+ rs <- knownRemotes <$> getDaemonStatus daemonstatus
+ mapM_ (\r -> queue r $ gentransfer r)
+ =<< sufficientremotes rs
+ where
+ sufficientremotes l
+ -- Queue downloads from all remotes that
+ -- have the key, with the cheapest ones first.
+ -- More expensive ones will only be tried if
+ -- downloading from a cheap one fails.
+ | direction == Download = do
+ uuids <- Remote.keyLocations k
+ return $ filter (\r -> uuid r `elem` uuids) l
+ -- TODO: Determine a smaller set of remotes that
+ -- can be uploaded to, in order to ensure all
+ -- remotes can access the content. Currently,
+ -- send to every remote we can.
+ | otherwise = return l
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
+ queue r t = liftIO $ void $ atomically $ do
+ let info = (stubInfo f) { transferRemote = Just r }
+ writeTChan q (t, info)
+
+{- Adds a pending transfer to the end of the queue. -}
+queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueTransfer q f t = void $ atomically $
+ writeTChan q (t, stubInfo f)
+
+{- Adds a pending transfer to the start of the queue, to be processed next. -}
+queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueNextTransfer q f t = void $ atomically $
+ unGetTChan q (t, stubInfo f)
+
+{- Blocks until a pending transfer is available in the queue. -}
+getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
+getNextTransfer = atomically . readTChan
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
new file mode 100644
index 000000000..dc077254d
--- /dev/null
+++ b/Assistant/TransferSlots.hs
@@ -0,0 +1,40 @@
+{- git-annex assistant transfer slots
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferSlots where
+
+import Control.Exception
+import Control.Concurrent
+
+import Common.Annex
+import Assistant.ThreadedMonad
+
+type TransferSlots = QSemN
+
+{- Number of concurrent transfers allowed to be run from the assistant.
+ -
+ - Transfers launched by other means, including by remote assistants,
+ - do not currently take up slots.
+ -}
+numSlots :: Int
+numSlots = 1
+
+newTransferSlots :: IO TransferSlots
+newTransferSlots = newQSemN numSlots
+
+{- Waits until a transfer slot becomes available, and runs a transfer
+ - action in the slot, in its own thread. -}
+inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId
+inTransferSlot s st a = forkIO $ bracket_ start done run
+ where
+ start = waitQSemN s 1
+ done = transferComplete s
+ run = unsafeRunThreadState st a
+
+{- Call when a transfer is complete. -}
+transferComplete :: TransferSlots -> IO ()
+transferComplete s = signalQSemN s 1
diff --git a/Backend/SHA.hs b/Backend/SHA.hs
index cf61139e0..bb400a768 100644
--- a/Backend/SHA.hs
+++ b/Backend/SHA.hs
@@ -53,14 +53,16 @@ shaN shasize file filesize = do
showAction "checksum"
case shaCommand shasize filesize of
Left sha -> liftIO $ sha <$> L.readFile file
- Right command -> liftIO $ runcommand command
+ Right command -> liftIO $ parse command . lines <$>
+ readProcess command (toCommand [File file])
where
- runcommand command =
- pOpen ReadFromPipe command (toCommand [File file]) $ \h -> do
- sha <- fst . separate (== ' ') <$> hGetLine h
- if null sha
- then error $ command ++ " parse error"
- else return sha
+ parse command [] = bad command
+ parse command (l:_)
+ | null sha = bad command
+ | otherwise = sha
+ where
+ sha = fst $ separate (== ' ') l
+ bad command = error $ command ++ " parse error"
shaCommand :: SHASize -> Integer -> Either (L.ByteString -> String) String
shaCommand shasize filesize
diff --git a/Build/Configure.hs b/Build/Configure.hs
index cf6623b22..9468e1704 100644
--- a/Build/Configure.hs
+++ b/Build/Configure.hs
@@ -4,7 +4,7 @@ module Build.Configure where
import System.Directory
import Data.List
-import System.Cmd.Utils
+import System.Process
import Control.Applicative
import System.FilePath
@@ -71,7 +71,7 @@ getVersionString = do
getGitVersion :: Test
getGitVersion = do
- (_, s) <- pipeFrom "git" ["--version"]
+ s <- readProcess "git" ["--version"] ""
let version = unwords $ drop 2 $ words $ head $ lines s
return $ Config "gitversion" (StringConfig version)
diff --git a/Command/Assistant.hs b/Command/Assistant.hs
new file mode 100644
index 000000000..60eac5d21
--- /dev/null
+++ b/Command/Assistant.hs
@@ -0,0 +1,18 @@
+{- git-annex assistant
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Command.Assistant where
+
+import Command
+import qualified Command.Watch
+
+def :: [Command]
+def = [withOptions [Command.Watch.foregroundOption, Command.Watch.stopOption] $
+ command "assistant" paramNothing seek "automatically handle changes"]
+
+seek :: [CommandSeek]
+seek = Command.Watch.mkSeek True
diff --git a/Command/Fsck.hs b/Command/Fsck.hs
index 10cca489b..0e3cc934c 100644
--- a/Command/Fsck.hs
+++ b/Command/Fsck.hs
@@ -7,6 +7,8 @@
module Command.Fsck where
+import System.Posix.Process (getProcessID)
+
import Common.Annex
import Command
import qualified Annex
diff --git a/Command/Map.hs b/Command/Map.hs
index 0773f6828..3dbdadbd6 100644
--- a/Command/Map.hs
+++ b/Command/Map.hs
@@ -199,8 +199,10 @@ tryScan r
Left _ -> return Nothing
Right r' -> return $ Just r'
pipedconfig cmd params = safely $
- pOpen ReadFromPipe cmd (toCommand params) $
+ withHandle StdoutHandle createProcessSuccess p $
Git.Config.hRead r
+ where
+ p = proc cmd $ toCommand params
configlist =
onRemote r (pipedconfig, Nothing) "configlist" [] []
diff --git a/Command/Sync.hs b/Command/Sync.hs
index bdb5d47a7..dfaed5949 100644
--- a/Command/Sync.hs
+++ b/Command/Sync.hs
@@ -39,7 +39,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote))
-- syncing involves several operations, any of which can independently fail
seek :: CommandSeek
seek rs = do
- !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ branch <- currentBranch
remotes <- syncRemotes rs
return $ concat
[ [ commit ]
@@ -49,6 +49,11 @@ seek rs = do
, [ pushLocal branch ]
, [ pushRemote remote branch | remote <- remotes ]
]
+
+currentBranch :: Annex Git.Ref
+currentBranch = do
+ !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ return branch
where
nobranch = error "no branch is checked out"
@@ -98,7 +103,7 @@ mergeLocal branch = go =<< needmerge
syncbranch = syncBranch branch
needmerge = do
unlessM (inRepo $ Git.Ref.exists syncbranch) $
- updateBranch syncbranch
+ inRepo $ updateBranch syncbranch
inRepo $ Git.Branch.changed branch syncbranch
go False = stop
go True = do
@@ -107,17 +112,17 @@ mergeLocal branch = go =<< needmerge
pushLocal :: Git.Ref -> CommandStart
pushLocal branch = do
- updateBranch $ syncBranch branch
+ inRepo $ updateBranch $ syncBranch branch
stop
-updateBranch :: Git.Ref -> Annex ()
-updateBranch syncbranch =
+updateBranch :: Git.Ref -> Git.Repo -> IO ()
+updateBranch syncbranch g =
unlessM go $ error $ "failed to update " ++ show syncbranch
where
- go = inRepo $ Git.Command.runBool "branch"
+ go = Git.Command.runBool "branch"
[ Param "-f"
, Param $ show $ Git.Ref.base syncbranch
- ]
+ ] g
pullRemote :: Remote -> Git.Ref -> CommandStart
pullRemote remote branch = do
@@ -143,19 +148,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge)
pushRemote :: Remote -> Git.Ref -> CommandStart
pushRemote remote branch = go =<< needpush
where
- needpush = anyM (newer remote) [syncbranch, Annex.Branch.name]
+ needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name]
go False = stop
go True = do
showStart "push" (Remote.name remote)
next $ next $ do
showOutput
- inRepo $ Git.Command.runBool "push"
- [ Param (Remote.name remote)
- , Param (show Annex.Branch.name)
- , Param refspec
- ]
- refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch)
- syncbranch = syncBranch branch
+ inRepo $ pushBranch remote branch
+
+pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool
+pushBranch remote branch g =
+ Git.Command.runBool "push"
+ [ Param (Remote.name remote)
+ , Param (show Annex.Branch.name)
+ , Param refspec
+ ] g
+ where
+ refspec = concat
+ [ show $ Git.Ref.base branch
+ , ":"
+ , show $ Git.Ref.base $ syncBranch branch
+ ]
mergeAnnex :: CommandStart
mergeAnnex = do
diff --git a/Command/Watch.hs b/Command/Watch.hs
index 5681b3861..744844c4d 100644
--- a/Command/Watch.hs
+++ b/Command/Watch.hs
@@ -1,6 +1,3 @@
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE BangPatterns #-}
-
{- git-annex watch command
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
@@ -19,10 +16,13 @@ def :: [Command]
def = [withOptions [foregroundOption, stopOption] $
command "watch" paramNothing seek "watch for changes"]
-seek :: [CommandSeek]
-seek = [withFlag stopOption $ \stopdaemon ->
+mkSeek :: Bool -> [CommandSeek]
+mkSeek assistant = [withFlag stopOption $ \stopdaemon ->
withFlag foregroundOption $ \foreground ->
- withNothing $ start foreground stopdaemon]
+ withNothing $ start assistant foreground stopdaemon]
+
+seek :: [CommandSeek]
+seek = mkSeek False
foregroundOption :: Option
foregroundOption = Option.flag [] "foreground" "do not daemonize"
@@ -30,9 +30,9 @@ foregroundOption = Option.flag [] "foreground" "do not daemonize"
stopOption :: Option
stopOption = Option.flag [] "stop" "stop daemon"
-start :: Bool -> Bool -> CommandStart
-start foreground stopdaemon = notBareRepo $ do
+start :: Bool -> Bool -> Bool -> CommandStart
+start assistant foreground stopdaemon = notBareRepo $ do
if stopdaemon
then stopDaemon
- else startDaemon foreground -- does not return
+ else startDaemon assistant foreground -- does not return
stop
diff --git a/Common.hs b/Common.hs
index 7f07781ce..04ec1e044 100644
--- a/Common.hs
+++ b/Common.hs
@@ -13,16 +13,15 @@ import Data.String.Utils as X
import System.Path as X
import System.FilePath as X
import System.Directory as X
-import System.Cmd.Utils as X hiding (safeSystem)
import System.IO as X hiding (FilePath)
import System.Posix.Files as X
import System.Posix.IO as X
-import System.Posix.Process as X hiding (executeFile)
import System.Exit as X
import Utility.Misc as X
import Utility.Exception as X
import Utility.SafeCommand as X
+import Utility.Process as X
import Utility.Path as X
import Utility.Directory as X
import Utility.Monad as X
diff --git a/Config.hs b/Config.hs
index e66947e2c..2c26adc73 100644
--- a/Config.hs
+++ b/Config.hs
@@ -56,7 +56,7 @@ remoteCost r def = do
cmd <- getRemoteConfig r "cost-command" ""
(fromMaybe def . readish) <$>
if not $ null cmd
- then liftIO $ snd <$> pipeFrom "sh" ["-c", cmd]
+ then liftIO $ readProcess "sh" ["-c", cmd]
else getRemoteConfig r "cost" ""
cheapRemoteCost :: Int
@@ -116,4 +116,4 @@ getHttpHeaders = do
cmd <- getConfig (annexConfig "http-headers-command") ""
if null cmd
then fromRepo $ Git.Config.getList "annex.http-headers"
- else lines . snd <$> liftIO (pipeFrom "sh" ["-c", cmd])
+ else lines <$> liftIO (readProcess "sh" ["-c", cmd])
diff --git a/Git/Branch.hs b/Git/Branch.hs
index 6edc1c306..4d239d8fc 100644
--- a/Git/Branch.hs
+++ b/Git/Branch.hs
@@ -73,12 +73,10 @@ commit :: String -> Branch -> [Ref] -> Repo -> IO Sha
commit message branch parentrefs repo = do
tree <- getSha "write-tree" $
pipeRead [Param "write-tree"] repo
- sha <- getSha "commit-tree" $
- ignorehandle $ pipeWriteRead
- (map Param $ ["commit-tree", show tree] ++ ps)
- message repo
+ sha <- getSha "commit-tree" $ pipeWriteRead
+ (map Param $ ["commit-tree", show tree] ++ ps)
+ message repo
run "update-ref" [Param $ show branch, Param $ show sha] repo
return sha
where
- ignorehandle a = snd <$> a
ps = concatMap (\r -> ["-p", show r]) parentrefs
diff --git a/Git/Command.hs b/Git/Command.hs
index 35f0838ba..cd6c98d33 100644
--- a/Git/Command.hs
+++ b/Git/Command.hs
@@ -7,10 +7,7 @@
module Git.Command where
-import qualified Data.Text.Lazy as L
-import qualified Data.Text.Lazy.IO as L
-import Control.Concurrent
-import Control.Exception (finally)
+import System.Posix.Process (getAnyProcessStatus)
import Common
import Git
@@ -43,30 +40,19 @@ run subcommand params repo = assertLocal repo $
- result unless reap is called.
-}
pipeRead :: [CommandParam] -> Repo -> IO String
-pipeRead params repo = assertLocal repo $ do
- (_, h) <- hPipeFrom "git" $ toCommand $ gitCommandLine params repo
- fileEncoding h
- hGetContents h
-
-{- Runs a git subcommand, feeding it input.
- - You should call either getProcessStatus or forceSuccess on the PipeHandle. -}
-pipeWrite :: [CommandParam] -> L.Text -> Repo -> IO PipeHandle
-pipeWrite params s repo = assertLocal repo $ do
- (p, h) <- hPipeTo "git" (toCommand $ gitCommandLine params repo)
- L.hPutStr h s
- hClose h
- return p
+pipeRead params repo = assertLocal repo $
+ withHandle StdoutHandle createBackgroundProcess p $ \h -> do
+ fileEncoding h
+ hGetContents h
+ where
+ p = proc "git" $ toCommand $ gitCommandLine params repo
-{- Runs a git subcommand, feeding it input, and returning its output.
- - You should call either getProcessStatus or forceSuccess on the PipeHandle. -}
-pipeWriteRead :: [CommandParam] -> String -> Repo -> IO (PipeHandle, String)
-pipeWriteRead params s repo = assertLocal repo $ do
- (p, from, to) <- hPipeBoth "git" (toCommand $ gitCommandLine params repo)
- fileEncoding to
- fileEncoding from
- _ <- forkIO $ finally (hPutStr to s) (hClose to)
- c <- hGetContents from
- return (p, c)
+{- Runs a git subcommand, feeding it input, and returning its output,
+ - which is expected to be fairly small, since it's all read into memory
+ - strictly. -}
+pipeWriteRead :: [CommandParam] -> String -> Repo -> IO String
+pipeWriteRead params s repo = assertLocal repo $
+ writeReadProcess "git" (toCommand $ gitCommandLine params repo) s
{- Reads null terminated output of a git command (as enabled by the -z
- parameter), and splits it. -}
diff --git a/Git/Config.hs b/Git/Config.hs
index c9e4f9a2d..c82d6bb1b 100644
--- a/Git/Config.hs
+++ b/Git/Config.hs
@@ -9,6 +9,7 @@ module Git.Config where
import qualified Data.Map as M
import Data.Char
+import System.Process (cwd)
import Common
import Git
@@ -39,7 +40,7 @@ reRead :: Repo -> IO Repo
reRead = read'
{- Cannot use pipeRead because it relies on the config having been already
- - read. Instead, chdir to the repo.
+ - read. Instead, chdir to the repo and run git config.
-}
read' :: Repo -> IO Repo
read' repo = go repo
@@ -47,9 +48,11 @@ read' repo = go repo
go Repo { location = Local { gitdir = d } } = git_config d
go Repo { location = LocalUnknown d } = git_config d
go _ = assertLocal repo $ error "internal"
- git_config d = bracketCd d $
- pOpen ReadFromPipe "git" ["config", "--null", "--list"] $
- hRead repo
+ git_config d = withHandle StdoutHandle createProcessSuccess p $
+ hRead repo
+ where
+ params = ["config", "--null", "--list"]
+ p = (proc "git" params) { cwd = Just d }
{- Reads git config from a handle and populates a repo with it. -}
hRead :: Repo -> Handle -> IO Repo
diff --git a/Git/HashObject.hs b/Git/HashObject.hs
index 9f37de5ba..c90c9ec3d 100644
--- a/Git/HashObject.hs
+++ b/Git/HashObject.hs
@@ -38,11 +38,9 @@ hashFile h file = CoProcess.query h send receive
{- Injects some content into git, returning its Sha. -}
hashObject :: ObjectType -> String -> Repo -> IO Sha
hashObject objtype content repo = getSha subcmd $ do
- (h, s) <- pipeWriteRead (map Param params) content repo
- length s `seq` do
- forceSuccess h
- reap -- XXX unsure why this is needed
- return s
+ s <- pipeWriteRead (map Param params) content repo
+ reap -- XXX unsure why this is needed, of if it is anymore
+ return s
where
subcmd = "hash-object"
params = [subcmd, "-t", show objtype, "-w", "--stdin"]
diff --git a/Git/Queue.hs b/Git/Queue.hs
index ddcf13519..f515ad104 100644
--- a/Git/Queue.hs
+++ b/Git/Queue.hs
@@ -19,7 +19,6 @@ module Git.Queue (
import qualified Data.Map as M
import System.IO
-import System.Cmd.Utils
import Data.String.Utils
import Utility.SafeCommand
@@ -149,10 +148,11 @@ runAction repo (UpdateIndexAction streamers) =
-- list is stored in reverse order
Git.UpdateIndex.streamUpdateIndex repo $ reverse streamers
runAction repo action@(CommandAction {}) =
- pOpen WriteToPipe "xargs" ("-0":"git":params) feedxargs
+ withHandle StdinHandle createProcessSuccess (proc "xargs" params) $ \h -> do
+ fileEncoding h
+ hPutStr h $ join "\0" $ getFiles action
+ hClose h
where
- params = toCommand $ gitCommandLine
+ params = "-0":"git":baseparams
+ baseparams = toCommand $ gitCommandLine
(Param (getSubcommand action):getParams action) repo
- feedxargs h = do
- fileEncoding h
- hPutStr h $ join "\0" $ getFiles action
diff --git a/Git/UpdateIndex.hs b/Git/UpdateIndex.hs
index abdc4bcbe..929448729 100644
--- a/Git/UpdateIndex.hs
+++ b/Git/UpdateIndex.hs
@@ -17,8 +17,6 @@ module Git.UpdateIndex (
stageSymlink
) where
-import System.Cmd.Utils
-
import Common
import Git
import Git.Types
@@ -36,13 +34,13 @@ pureStreamer !s = \streamer -> streamer s
{- Streams content into update-index from a list of Streamers. -}
streamUpdateIndex :: Repo -> [Streamer] -> IO ()
-streamUpdateIndex repo as = do
- (p, h) <- hPipeTo "git" (toCommand $ gitCommandLine params repo)
- fileEncoding h
- forM_ as (stream h)
- hClose h
- forceSuccess p
+streamUpdateIndex repo as =
+ withHandle StdinHandle createProcessSuccess (proc "git" ps) $ \h -> do
+ fileEncoding h
+ forM_ as (stream h)
+ hClose h
where
+ ps = toCommand $ gitCommandLine params repo
params = map Param ["update-index", "-z", "--index-info"]
stream h a = a (streamer h)
streamer h s = do
diff --git a/GitAnnex.hs b/GitAnnex.hs
index bf1f27bfd..7b1fa5986 100644
--- a/GitAnnex.hs
+++ b/GitAnnex.hs
@@ -62,6 +62,7 @@ import qualified Command.Upgrade
import qualified Command.Version
#ifdef WITH_ASSISTANT
import qualified Command.Watch
+import qualified Command.Assistant
#endif
cmds :: [Command]
@@ -106,6 +107,7 @@ cmds = concat
, Command.Version.def
#ifdef WITH_ASSISTANT
, Command.Watch.def
+ , Command.Assistant.def
#endif
]
diff --git a/Makefile b/Makefile
index 4d5628746..0afb10a7b 100644
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ endif
PREFIX=/usr
IGNORE=-ignore-package monads-fd -ignore-package monads-tf
-BASEFLAGS=-Wall $(IGNORE) -outputdir tmp -IUtility -DWITH_ASSISTANT -DWITH_S3 $(BASEFLAGS_OPTS)
+BASEFLAGS=-threaded -Wall $(IGNORE) -outputdir tmp -IUtility -DWITH_ASSISTANT -DWITH_S3 $(BASEFLAGS_OPTS)
GHCFLAGS=-O2 $(BASEFLAGS)
CFLAGS=-Wall
diff --git a/Remote/Bup.hs b/Remote/Bup.hs
index 0d1b606d3..8a2c1afef 100644
--- a/Remote/Bup.hs
+++ b/Remote/Bup.hs
@@ -133,13 +133,13 @@ retrieveCheap :: BupRepo -> Key -> FilePath -> Annex Bool
retrieveCheap _ _ _ = return False
retrieveEncrypted :: BupRepo -> (Cipher, Key) -> Key -> FilePath -> Annex Bool
-retrieveEncrypted buprepo (cipher, enck) _ f = do
- let params = bupParams "join" buprepo [Param $ bupRef enck]
- liftIO $ catchBoolIO $ do
- (pid, h) <- hPipeFrom "bup" $ toCommand params
+retrieveEncrypted buprepo (cipher, enck) _ f = liftIO $ catchBoolIO $
+ withHandle StdoutHandle createProcessSuccess p $ \h -> do
withDecryptedContent cipher (L.hGetContents h) $ L.writeFile f
- forceSuccess pid
return True
+ where
+ params = bupParams "join" buprepo [Param $ bupRef enck]
+ p = proc "bup" $ toCommand params
remove :: Key -> Annex Bool
remove _ = do
diff --git a/Remote/Git.hs b/Remote/Git.hs
index d80f580fc..3412de89b 100644
--- a/Remote/Git.hs
+++ b/Remote/Git.hs
@@ -127,16 +127,17 @@ tryGitConfigRead r
=<< liftIO (try a :: IO (Either SomeException Git.Repo))
pipedconfig cmd params = safely $
- pOpen ReadFromPipe cmd (toCommand params) $
+ withHandle StdoutHandle createProcessSuccess p $
Git.Config.hRead r
+ where
+ p = proc cmd $ toCommand params
geturlconfig headers = do
s <- Url.get (Git.repoLocation r ++ "/config") headers
withTempFile "git-annex.tmp" $ \tmpfile h -> do
hPutStr h s
hClose h
- pOpen ReadFromPipe "git" ["config", "--null", "--list", "--file", tmpfile] $
- Git.Config.hRead r
+ pipedconfig "git" [Param "config", Param "--null", Param "--list", Param "--file", File tmpfile]
store = observe $ \r' -> do
g <- gitRepo
diff --git a/Remote/Hook.hs b/Remote/Hook.hs
index 9e8d3c620..cad6e2fc9 100644
--- a/Remote/Hook.hs
+++ b/Remote/Hook.hs
@@ -9,7 +9,6 @@ module Remote.Hook (remote) where
import qualified Data.ByteString.Lazy as L
import qualified Data.Map as M
-import System.Exit
import System.Environment
import Common.Annex
@@ -136,17 +135,5 @@ checkPresent r h k = do
findkey s = show k `elem` lines s
check Nothing = error "checkpresent hook misconfigured"
check (Just hook) = do
- (frompipe, topipe) <- createPipe
- pid <- forkProcess $ do
- _ <- dupTo topipe stdOutput
- closeFd frompipe
- executeFile "sh" True ["-c", hook]
- =<< hookEnv k Nothing
- closeFd topipe
- fromh <- fdToHandle frompipe
- reply <- hGetContentsStrict fromh
- hClose fromh
- s <- getProcessStatus True False pid
- case s of
- Just (Exited ExitSuccess) -> return $ findkey reply
- _ -> error "checkpresent hook failed"
+ env <- hookEnv k Nothing
+ findkey <$> readProcessEnv "sh" ["-c", hook] env
diff --git a/Remote/Rsync.hs b/Remote/Rsync.hs
index 29bceb2db..ee516a8a5 100644
--- a/Remote/Rsync.hs
+++ b/Remote/Rsync.hs
@@ -9,6 +9,7 @@ module Remote.Rsync (remote) where
import qualified Data.ByteString.Lazy as L
import qualified Data.Map as M
+import System.Posix.Process (getProcessID)
import Common.Annex
import Types.Remote
diff --git a/Seek.hs b/Seek.hs
index 2cf0d8d46..3306a02fc 100644
--- a/Seek.hs
+++ b/Seek.hs
@@ -108,9 +108,9 @@ withNothing _ _ = error "This command takes no parameters."
prepFiltered :: (FilePath -> CommandStart) -> Annex [FilePath] -> Annex [CommandStart]
prepFiltered a fs = do
matcher <- Limit.getMatcher
- map (proc matcher) <$> fs
+ map (process matcher) <$> fs
where
- proc matcher f = do
+ process matcher f = do
ok <- matcher f
if ok then a f else return Nothing
diff --git a/Utility/CoProcess.hs b/Utility/CoProcess.hs
index 9fa8d864f..67f861bb3 100644
--- a/Utility/CoProcess.hs
+++ b/Utility/CoProcess.hs
@@ -13,23 +13,23 @@ module Utility.CoProcess (
query
) where
-import System.Cmd.Utils
-
import Common
-type CoProcessHandle = (PipeHandle, Handle, Handle)
+type CoProcessHandle = (ProcessHandle, Handle, Handle, CreateProcess)
start :: FilePath -> [String] -> IO CoProcessHandle
-start command params = hPipeBoth command params
+start command params = do
+ (from, to, _err, pid) <- runInteractiveProcess command params Nothing Nothing
+ return (pid, to, from, proc command params)
stop :: CoProcessHandle -> IO ()
-stop (pid, from, to) = do
+stop (pid, from, to, p) = do
hClose to
hClose from
- forceSuccess pid
+ forceSuccessProcess p pid
query :: CoProcessHandle -> (Handle -> IO a) -> (Handle -> IO b) -> IO b
-query (_, from, to) send receive = do
+query (_, from, to, _) send receive = do
_ <- send to
hFlush to
receive from
diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs
index 11ce7baef..213aeb50a 100644
--- a/Utility/DirWatcher.hs
+++ b/Utility/DirWatcher.hs
@@ -17,10 +17,10 @@ import Utility.Types.DirWatcher
#if WITH_INOTIFY
import qualified Utility.INotify as INotify
import qualified System.INotify as INotify
-import Utility.ThreadScheduler
#endif
#if WITH_KQUEUE
import qualified Utility.Kqueue as Kqueue
+import Control.Concurrent
#endif
type Pruner = FilePath -> Bool
@@ -72,19 +72,41 @@ closingTracked = undefined
#endif
#endif
+/* Starts a watcher thread. The runStartup action is passed a scanner action
+ * to run, that will return once the initial directory scan is complete.
+ * Once runStartup returns, the watcher thread continues running,
+ * and processing events. Returns a DirWatcherHandle that can be used
+ * to shutdown later. */
#if WITH_INOTIFY
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
-watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do
+type DirWatcherHandle = INotify.INotify
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle
+watchDir dir prune hooks runstartup = do
+ i <- INotify.initINotify
runstartup $ INotify.watchDir i dir prune hooks
- waitForTermination -- Let the inotify thread run.
+ return i
#else
#if WITH_KQUEUE
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO ()
+type DirWatcherHandle = ThreadId
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO DirWatcherHandle
watchDir dir ignored hooks runstartup = do
kq <- runstartup $ Kqueue.initKqueue dir ignored
- Kqueue.runHooks kq hooks
+ forkIO $ Kqueue.runHooks kq hooks
#else
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
+type DirWatcherHandle = ()
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle
watchDir = undefined
#endif
#endif
+
+#if WITH_INOTIFY
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = INotify.killINotify
+#else
+#if WITH_KQUEUE
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = killThread
+#else
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = undefined
+#endif
+#endif
diff --git a/Utility/Gpg.hs b/Utility/Gpg.hs
index e13afe5d4..eed77805c 100644
--- a/Utility/Gpg.hs
+++ b/Utility/Gpg.hs
@@ -11,8 +11,7 @@ import qualified Data.ByteString.Lazy as L
import System.Posix.Types
import Control.Applicative
import Control.Concurrent
-import Control.Exception (finally, bracket)
-import System.Exit
+import Control.Exception (bracket)
import System.Posix.Env (setEnv, unsetEnv, getEnv)
import Common
@@ -39,18 +38,21 @@ stdParams params = do
readStrict :: [CommandParam] -> IO String
readStrict params = do
params' <- stdParams params
- pOpen ReadFromPipe "gpg" params' hGetContentsStrict
+ withHandle StdoutHandle createProcessSuccess (proc "gpg" params') $ \h -> do
+ hSetBinaryMode h True
+ hGetContentsStrict h
{- Runs gpg, piping an input value to it, and returning its stdout,
- strictly. -}
pipeStrict :: [CommandParam] -> String -> IO String
pipeStrict params input = do
params' <- stdParams params
- (pid, fromh, toh) <- hPipeBoth "gpg" params'
- _ <- forkIO $ finally (hPutStr toh input) (hClose toh)
- output <- hGetContentsStrict fromh
- forceSuccess pid
- return output
+ withBothHandles createProcessSuccess (proc "gpg" params') $ \(to, from) -> do
+ hSetBinaryMode to True
+ hSetBinaryMode from True
+ hPutStr to input
+ hClose to
+ hGetContentsStrict from
{- Runs gpg with some parameters, first feeding it a passphrase via
- --passphrase-fd, then feeding it an input, and passing a handle
@@ -70,19 +72,13 @@ passphraseHandle params passphrase a b = do
let passphrasefd = [Param "--passphrase-fd", Param $ show pfd]
params' <- stdParams $ passphrasefd ++ params
- (pid, fromh, toh) <- hPipeBoth "gpg" params'
- pid2 <- forkProcess $ do
- L.hPut toh =<< a
- hClose toh
- exitSuccess
- hClose toh
- ret <- b fromh
-
- -- cleanup
- forceSuccess pid
- _ <- getProcessStatus True False pid2
- closeFd frompipe
- return ret
+ closeFd frompipe `after`
+ withBothHandles createProcessSuccess (proc "gpg" params') go
+ where
+ go (to, from) = do
+ L.hPut to =<< a
+ hClose to
+ b from
{- Finds gpg public keys matching some string. (Could be an email address,
- a key id, or a name. -}
diff --git a/Utility/INotify.hs b/Utility/INotify.hs
index bf87f4e71..6af022819 100644
--- a/Utility/INotify.hs
+++ b/Utility/INotify.hs
@@ -160,12 +160,9 @@ tooManyWatches hook dir = do
querySysctl :: Read a => [CommandParam] -> IO (Maybe a)
querySysctl ps = do
- v <- catchMaybeIO $ hPipeFrom "sysctl" $ toCommand ps
+ v <- catchMaybeIO $ readProcess "sysctl" (toCommand ps)
case v of
Nothing -> return Nothing
- Just (pid, h) -> do
- val <- parsesysctl <$> hGetContentsStrict h
- void $ getProcessStatus True False $ processID pid
- return val
+ Just s -> return $ parsesysctl s
where
parsesysctl s = readish =<< lastMaybe (words s)
diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs
index 7e7e653ec..c1a0a5cd6 100644
--- a/Utility/Kqueue.hs
+++ b/Utility/Kqueue.hs
@@ -14,8 +14,6 @@ module Utility.Kqueue (
waitChange,
Change(..),
changedFile,
- isAdd,
- isDelete,
runHooks,
) where
@@ -34,19 +32,19 @@ import Control.Concurrent
data Change
= Deleted FilePath
+ | DeletedDir FilePath
| Added FilePath
deriving (Show)
isAdd :: Change -> Bool
isAdd (Added _) = True
isAdd (Deleted _) = False
-
-isDelete :: Change -> Bool
-isDelete = not . isAdd
+isAdd (DeletedDir _) = False
changedFile :: Change -> FilePath
changedFile (Added f) = f
changedFile (Deleted f) = f
+changedFile (DeletedDir f) = f
data Kqueue = Kqueue
{ kqueueFd :: Fd
@@ -59,27 +57,43 @@ type Pruner = FilePath -> Bool
type DirMap = M.Map Fd DirInfo
-{- A directory, and its last known contents (with filenames relative to it) -}
+{- Enough information to uniquely identify a file in a directory,
+ - but not too much. -}
+data DirEnt = DirEnt
+ { dirEnt :: FilePath -- relative to the parent directory
+ , _dirInode :: FileID -- included to notice file replacements
+ , isSubDir :: Bool
+ }
+ deriving (Eq, Ord, Show)
+
+{- A directory, and its last known contents. -}
data DirInfo = DirInfo
{ dirName :: FilePath
- , dirCache :: S.Set FilePath
+ , dirCache :: S.Set DirEnt
}
deriving (Show)
getDirInfo :: FilePath -> IO DirInfo
getDirInfo dir = do
- contents <- S.fromList . filter (not . dirCruft)
- <$> getDirectoryContents dir
+ l <- filter (not . dirCruft) <$> getDirectoryContents dir
+ contents <- S.fromList . catMaybes <$> mapM getDirEnt l
return $ DirInfo dir contents
+ where
+ getDirEnt f = catchMaybeIO $ do
+ s <- getFileStatus (dir </> f)
+ return $ DirEnt f (fileID s) (isDirectory s)
{- Difference between the dirCaches of two DirInfos. -}
(//) :: DirInfo -> DirInfo -> [Change]
oldc // newc = deleted ++ added
where
- deleted = calc Deleted oldc newc
- added = calc Added newc oldc
- calc a x y = map a . map (dirName x </>) $
- S.toList $ S.difference (dirCache x) (dirCache y)
+ deleted = calc gendel oldc newc
+ added = calc genadd newc oldc
+ gendel x = (if isSubDir x then DeletedDir else Deleted) $
+ dirName oldc </> dirEnt x
+ genadd x = Added $ dirName newc </> dirEnt x
+ calc a x y = map a $ S.toList $
+ S.difference (dirCache x) (dirCache y)
{- Builds a map of directories in a tree, possibly pruning some.
- Opens each directory in the tree, and records its current contents. -}
@@ -99,7 +113,7 @@ scanRecursive topdir prune = M.fromList <$> walk [] [topdir]
case mfd of
Nothing -> walk c rest
Just fd -> do
- let subdirs = map (dir </>) $
+ let subdirs = map (dir </>) . map dirEnt $
S.toList $ dirCache info
walk ((fd, info):c) (subdirs ++ rest)
@@ -123,7 +137,8 @@ removeSubDir dirmap dir = do
findDirContents :: DirMap -> FilePath -> [FilePath]
findDirContents dirmap dir = concatMap absolutecontents $ search
where
- absolutecontents i = map (dirName i </>) (S.toList $ dirCache i)
+ absolutecontents i = map (dirName i </>)
+ (map dirEnt $ S.toList $ dirCache i)
search = map snd $ M.toList $
M.filter (\i -> dirName i == dir) dirmap
@@ -224,12 +239,14 @@ runHooks kq hooks = do
(q', changes) <- waitChange q
forM_ changes $ dispatch (kqueueMap q')
loop q'
- -- Kqueue returns changes for both whole directories
- -- being added and deleted, and individual files being
- -- added and deleted.
- dispatch dirmap change
- | isAdd change = withstatus change $ dispatchadd dirmap
- | otherwise = callhook delDirHook Nothing change
+
+ dispatch _ change@(Deleted _) =
+ callhook delHook Nothing change
+ dispatch _ change@(DeletedDir _) =
+ callhook delDirHook Nothing change
+ dispatch dirmap change@(Added _) =
+ withstatus change $ dispatchadd dirmap
+
dispatchadd dirmap change s
| Files.isSymbolicLink s =
callhook addSymlinkHook (Just s) change
@@ -237,12 +254,15 @@ runHooks kq hooks = do
| Files.isRegularFile s =
callhook addHook (Just s) change
| otherwise = noop
+
recursiveadd dirmap change = do
let contents = findDirContents dirmap $ changedFile change
forM_ contents $ \f ->
withstatus (Added f) $ dispatchadd dirmap
+
callhook h s change = case h hooks of
Nothing -> noop
Just a -> a (changedFile change) s
+
withstatus change a = maybe noop (a change) =<<
(catchMaybeIO (getSymbolicLinkStatus (changedFile change)))
diff --git a/Utility/Lsof.hs b/Utility/Lsof.hs
index 0061dfe57..ce6a16283 100644
--- a/Utility/Lsof.hs
+++ b/Utility/Lsof.hs
@@ -33,11 +33,11 @@ queryDir path = query ["+d", path]
- Note: If lsof is not available, this always returns [] !
-}
query :: [String] -> IO [(FilePath, LsofOpenMode, ProcessInfo)]
-query opts = do
- (pid, s) <- pipeFrom "lsof" ("-F0can" : opts)
- let !r = parse s
- void $ getProcessStatus True False $ processID pid
- return r
+query opts =
+ withHandle StdoutHandle (createProcessChecked checkSuccessProcess) p $ \h -> do
+ parse <$> hGetContentsStrict h
+ where
+ p = proc "lsof" ("-F0can" : opts)
{- Parsing null-delimited output like:
-
diff --git a/Utility/Misc.hs b/Utility/Misc.hs
index 3b359139b..e11586467 100644
--- a/Utility/Misc.hs
+++ b/Utility/Misc.hs
@@ -33,7 +33,7 @@ separate c l = unbreak $ break c l
| otherwise = (a, tail b)
{- Breaks out the first line. -}
-firstLine :: String-> String
+firstLine :: String -> String
firstLine = takeWhile (/= '\n')
{- Splits a list into segments that are delimited by items matching
diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs
new file mode 100644
index 000000000..fcab2a90a
--- /dev/null
+++ b/Utility/Parallel.hs
@@ -0,0 +1,35 @@
+{- parallel processing via threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Utility.Parallel where
+
+import Common
+
+import Control.Concurrent
+import Control.Exception
+
+{- Runs an action in parallel with a set of values, in a set of threads.
+ - In order for the actions to truely run in parallel, requires GHC's
+ - threaded runtime,
+ -
+ - Returns the values partitioned into ones with which the action succeeded,
+ - and ones with which it failed. -}
+inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v])
+inParallel a l = do
+ mvars <- mapM thread l
+ statuses <- mapM takeMVar mvars
+ return $ reduce $ partition snd $ zip l statuses
+ where
+ reduce (x,y) = (map fst x, map fst y)
+ thread v = do
+ mvar <- newEmptyMVar
+ _ <- forkIO $ do
+ r <- try (a v) :: IO (Either SomeException ())
+ case r of
+ Left _ -> putMVar mvar False
+ Right _ -> putMVar mvar True
+ return mvar
diff --git a/Utility/Process.hs b/Utility/Process.hs
new file mode 100644
index 000000000..3b293df4f
--- /dev/null
+++ b/Utility/Process.hs
@@ -0,0 +1,214 @@
+{- System.Process enhancements, including additional ways of running
+ - processes, and logging.
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE Rank2Types #-}
+
+module Utility.Process (
+ module X,
+ CreateProcess,
+ StdHandle(..),
+ readProcessEnv,
+ forceSuccessProcess,
+ checkSuccessProcess,
+ createProcessSuccess,
+ createProcessChecked,
+ createBackgroundProcess,
+ withHandle,
+ withBothHandles,
+ createProcess,
+ runInteractiveProcess,
+ writeReadProcess,
+ readProcess
+) where
+
+import qualified System.Process
+import System.Process as X hiding (CreateProcess(..), createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode, system, rawSystem, runInteractiveCommand, runProcess)
+import System.Process hiding (createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode)
+import System.Exit
+import System.IO
+import System.Log.Logger
+
+import Utility.Misc
+
+type CreateProcessRunner = forall a. CreateProcess -> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO a) -> IO a
+
+data StdHandle = StdinHandle | StdoutHandle | StderrHandle
+ deriving (Eq)
+
+{- Like readProcess, but allows specifying the environment, and does
+ - not mess with stdin. -}
+readProcessEnv :: FilePath -> [String] -> Maybe [(String, String)] -> IO String
+readProcessEnv cmd args environ =
+ withHandle StdoutHandle createProcessSuccess p $ \h -> do
+ output <- hGetContentsStrict h
+ hClose h
+ return output
+ where
+ p = (proc cmd args)
+ { std_out = CreatePipe
+ , env = environ
+ }
+
+{- Waits for a ProcessHandle, and throws an exception if the process
+ - did not exit successfully. -}
+forceSuccessProcess :: CreateProcess -> ProcessHandle -> IO ()
+forceSuccessProcess p pid = do
+ code <- waitForProcess pid
+ case code of
+ ExitSuccess -> return ()
+ ExitFailure n -> error $ showCmd p ++ " exited " ++ show n
+
+{- Waits for a ProcessHandle and returns True if it exited successfully. -}
+checkSuccessProcess :: ProcessHandle -> IO Bool
+checkSuccessProcess pid = do
+ code <- waitForProcess pid
+ return $ code == ExitSuccess
+
+{- Runs createProcess, then an action on its handles, and then
+ - forceSuccessProcess. -}
+createProcessSuccess :: CreateProcessRunner
+createProcessSuccess p a = createProcessChecked (forceSuccessProcess p) p a
+
+{- Runs createProcess, then an action on its handles, and then
+ - an action on its exit code. -}
+createProcessChecked :: (ProcessHandle -> IO b) -> CreateProcessRunner
+createProcessChecked checker p a = do
+ t@(_, _, _, pid) <- createProcess p
+ r <- a t
+ _ <- checker pid
+ return r
+
+{- Leaves the process running, suitable for lazy streaming.
+ - Note: Zombies will result, and must be waited on. -}
+createBackgroundProcess :: CreateProcessRunner
+createBackgroundProcess p a = a =<< createProcess p
+
+{- Runs a CreateProcessRunner, on a CreateProcess structure, that
+ - is adjusted to pipe only from/to a single StdHandle, and passes
+ - the resulting Handle to an action. -}
+withHandle
+ :: StdHandle
+ -> CreateProcessRunner
+ -> CreateProcess
+ -> (Handle -> IO a)
+ -> IO a
+withHandle h creator p a = creator p' $ a . select
+ where
+ base = p
+ { std_in = Inherit
+ , std_out = Inherit
+ , std_err = Inherit
+ }
+ (select, p')
+ | h == StdinHandle =
+ (stdinHandle, base { std_in = CreatePipe })
+ | h == StdoutHandle =
+ (stdoutHandle, base { std_out = CreatePipe })
+ | h == StderrHandle =
+ (stderrHandle, base { std_err = CreatePipe })
+
+{- Like withHandle, but passes (stdin, stdout) handles to the action. -}
+withBothHandles
+ :: CreateProcessRunner
+ -> CreateProcess
+ -> ((Handle, Handle) -> IO a)
+ -> IO a
+withBothHandles creator p a = creator p' $ a . bothHandles
+ where
+ p' = p
+ { std_in = CreatePipe
+ , std_out = CreatePipe
+ , std_err = Inherit
+ }
+
+{- Extract a desired handle from createProcess's tuple.
+ - These partial functions are safe as long as createProcess is run
+ - with appropriate parameters to set up the desired handle.
+ - Get it wrong and the runtime crash will always happen, so should be
+ - easily noticed. -}
+type HandleExtractor = (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> Handle
+stdinHandle :: HandleExtractor
+stdinHandle (Just h, _, _, _) = h
+stdinHandle _ = error "expected stdinHandle"
+stdoutHandle :: HandleExtractor
+stdoutHandle (_, Just h, _, _) = h
+stdoutHandle _ = error "expected stdoutHandle"
+stderrHandle :: HandleExtractor
+stderrHandle (_, _, Just h, _) = h
+stderrHandle _ = error "expected stderrHandle"
+bothHandles :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> (Handle, Handle)
+bothHandles (Just hin, Just hout, _, _) = (hin, hout)
+bothHandles _ = error "expected bothHandles"
+
+{- Debugging trace for a CreateProcess. -}
+debugProcess :: CreateProcess -> IO ()
+debugProcess p = do
+ debugM "Utility.Process" $ unwords
+ [ action ++ ":"
+ , showCmd p
+ , maybe "" show (env p)
+ ]
+ where
+ action
+ | piped (std_in p) && piped (std_out p) = "chat"
+ | piped (std_in p) = "feed"
+ | piped (std_out p) = "read"
+ | otherwise = "call"
+ piped Inherit = False
+ piped _ = True
+
+{- Shows the command that a CreateProcess will run. -}
+showCmd :: CreateProcess -> String
+showCmd = go . cmdspec
+ where
+ go (ShellCommand s) = s
+ go (RawCommand c ps) = c ++ " " ++ show ps
+
+{- Wrappers for System.Process functions that do debug logging.
+ -
+ - More could be added, but these are the only ones I usually need.
+ -}
+
+createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
+createProcess p = do
+ debugProcess p
+ System.Process.createProcess p
+
+runInteractiveProcess
+ :: FilePath
+ -> [String]
+ -> Maybe FilePath
+ -> Maybe [(String, String)]
+ -> IO (Handle, Handle, Handle, ProcessHandle)
+runInteractiveProcess f args c e = do
+ debugProcess $ (proc f args)
+ { std_in = CreatePipe
+ , std_out = CreatePipe
+ , std_err = CreatePipe
+ }
+ System.Process.runInteractiveProcess f args c e
+
+{- I think this is a more descriptive name than System.Process.readProcess. -}
+writeReadProcess
+ :: FilePath
+ -> [String]
+ -> String
+ -> IO String
+writeReadProcess f args input = do
+ debugProcess $ (proc f args) { std_out = CreatePipe, std_in = CreatePipe }
+ System.Process.readProcess f args input
+
+{- Normally, when reading from a process, it does not need to be fed any
+ - input. -}
+readProcess
+ :: FilePath
+ -> [String]
+ -> IO String
+readProcess f args = do
+ debugProcess $ (proc f args) { std_out = CreatePipe }
+ System.Process.readProcess f args []
diff --git a/Utility/SafeCommand.hs b/Utility/SafeCommand.hs
index aedf27137..19dd707b8 100644
--- a/Utility/SafeCommand.hs
+++ b/Utility/SafeCommand.hs
@@ -1,6 +1,6 @@
{- safely running shell commands
-
- - Copyright 2010-2011 Joey Hess <joey@kitenet.net>
+ - Copyright 2010-2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -8,11 +8,9 @@
module Utility.SafeCommand where
import System.Exit
-import qualified System.Posix.Process
-import System.Posix.Process hiding (executeFile)
-import System.Posix.Signals
+import Utility.Process
+import System.Process (env)
import Data.String.Utils
-import System.Log.Logger
import Control.Applicative
{- A type for parameters passed to a shell command. A command can
@@ -42,7 +40,7 @@ boolSystem :: FilePath -> [CommandParam] -> IO Bool
boolSystem command params = boolSystemEnv command params Nothing
boolSystemEnv :: FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
-boolSystemEnv command params env = dispatch <$> safeSystemEnv command params env
+boolSystemEnv command params environ = dispatch <$> safeSystemEnv command params environ
where
dispatch ExitSuccess = True
dispatch _ = False
@@ -51,36 +49,13 @@ boolSystemEnv command params env = dispatch <$> safeSystemEnv command params env
safeSystem :: FilePath -> [CommandParam] -> IO ExitCode
safeSystem command params = safeSystemEnv command params Nothing
-{- SIGINT(ctrl-c) is allowed to propigate and will terminate the program. -}
+{- Unlike many implementations of system, SIGINT(ctrl-c) is allowed
+ - to propigate and will terminate the program. -}
safeSystemEnv :: FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO ExitCode
-safeSystemEnv command params env = do
- -- Going low-level because all the high-level system functions
- -- block SIGINT etc. We need to block SIGCHLD, but allow
- -- SIGINT to do its default program termination.
- let sigset = addSignal sigCHLD emptySignalSet
- oldint <- installHandler sigINT Default Nothing
- oldset <- getSignalMask
- blockSignals sigset
- childpid <- forkProcess $ childaction oldint oldset
- mps <- getProcessStatus True False childpid
- restoresignals oldint oldset
- case mps of
- Just (Exited code) -> return code
- _ -> error $ "unknown error running " ++ command
- where
- restoresignals oldint oldset = do
- _ <- installHandler sigINT oldint Nothing
- setSignalMask oldset
- childaction oldint oldset = do
- restoresignals oldint oldset
- executeFile command True (toCommand params) env
-
-{- executeFile with debug logging -}
-executeFile :: FilePath -> Bool -> [String] -> Maybe [(String, String)] -> IO ()
-executeFile c path p e = do
- debugM "Utility.SafeCommand.executeFile" $
- "Running: " ++ c ++ " " ++ show p ++ " " ++ maybe "" show e
- System.Posix.Process.executeFile c path p e
+safeSystemEnv command params environ = do
+ (_, _, _, pid) <- createProcess (proc command $ toCommand params)
+ { env = environ }
+ waitForProcess pid
{- Escapes a filename or other parameter to be safely able to be exposed to
- the shell. -}
diff --git a/Utility/TSet.hs b/Utility/TSet.hs
new file mode 100644
index 000000000..24d345477
--- /dev/null
+++ b/Utility/TSet.hs
@@ -0,0 +1,39 @@
+{- Transactional sets
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Utility.TSet where
+
+import Common
+
+import Control.Concurrent.STM
+
+type TSet = TChan
+
+runTSet :: STM a -> IO a
+runTSet = atomically
+
+newTSet :: IO (TSet a)
+newTSet = atomically newTChan
+
+{- Gets the contents of the TSet. Blocks until at least one item is
+ - present. -}
+getTSet :: TSet a -> IO [a]
+getTSet tset = runTSet $ do
+ c <- readTChan tset
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan tset
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts items into a TSet. -}
+putTSet :: TSet a -> [a] -> IO ()
+putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs
+
+{- Put a single item into a TSet. -}
+putTSet1 :: TSet a -> a -> IO ()
+putTSet1 tset v = void $ runTSet $ writeTChan tset v
diff --git a/Utility/TempFile.hs b/Utility/TempFile.hs
index 4dcbf1cca..62e0fc859 100644
--- a/Utility/TempFile.hs
+++ b/Utility/TempFile.hs
@@ -9,7 +9,7 @@ module Utility.TempFile where
import Control.Exception (bracket)
import System.IO
-import System.Posix.Process hiding (executeFile)
+import System.Posix.Process
import System.Directory
import Utility.Exception
diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs
index c828a0593..ba7eae6a1 100644
--- a/Utility/Types/DirWatcher.hs
+++ b/Utility/Types/DirWatcher.hs
@@ -20,3 +20,6 @@ data WatchHooks = WatchHooks
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}
+
+mkWatchHooks :: WatchHooks
+mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing
diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn
index 9b3e3b08e..12dc0aa3e 100644
--- a/doc/design/assistant/syncing.mdwn
+++ b/doc/design/assistant/syncing.mdwn
@@ -35,6 +35,12 @@ all the other git clones, at both the git level and the key/value level.
(will need to use `GIT_SSH`, which needs to point to a command to run,
not a shell command line)
+## misc todo
+
+* --debug will show often unnecessary work being done. Optimise.
+* It would be nice if, when a USB drive is connected,
+ syncing starts automatically. Use dbus on Linux?
+
## data syncing
There are two parts to data syncing. First, map the network and second,
diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn
index c52a5f3bf..85a5a18f0 100644
--- a/doc/git-annex.mdwn
+++ b/doc/git-annex.mdwn
@@ -185,6 +185,10 @@ subdirectories).
To not daemonize, run with --foreground ; to stop a running daemon,
run with --stop
+* assistant
+
+ Like watch, but also automatically syncs changes to other remotes.
+
# REPOSITORY SETUP COMMANDS
* init [description]
diff --git a/doc/todo/assistant_threaded_runtime.mdwn b/doc/todo/assistant_threaded_runtime.mdwn
index 3953cf062..03ba66acf 100644
--- a/doc/todo/assistant_threaded_runtime.mdwn
+++ b/doc/todo/assistant_threaded_runtime.mdwn
@@ -28,6 +28,9 @@ git-annex does not otherwise use threads, so this is surprising. --[[Joey]]
> I've spent a lot of time debugging this, and trying to fix it, in the
> "threaded" branch. There are still deadlocks. --[[Joey]]
+>> Fixed, by switching from `System.Cmd.Utils` to `System.Process`
+>> --[[Joey]]
+
---
It would be possible to not use the threaded runtime. Instead, we could
diff --git a/git-annex.cabal b/git-annex.cabal
index 0bd35e14f..e58bd4d95 100644
--- a/git-annex.cabal
+++ b/git-annex.cabal
@@ -1,5 +1,5 @@
Name: git-annex
-Version: 3.20120629
+Version: 3.20120630
Cabal-Version: >= 1.8
License: GPL
Maintainer: Joey Hess <joey@kitenet.net>
@@ -40,11 +40,12 @@ Executable git-annex
unix, containers, utf8-string, network, mtl, bytestring, old-locale, time,
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
base == 4.5.*, monad-control, transformers-base, lifted-base,
- IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance
+ IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process
-- Need to list this because it's generated from a .hsc file.
Other-Modules: Utility.Touch
C-Sources: Utility/libdiskfree.c
Extensions: CPP
+ GHC-Options: -threaded
if flag(S3)
Build-Depends: hS3
@@ -65,10 +66,11 @@ Test-Suite test
unix, containers, utf8-string, network, mtl, bytestring, old-locale, time,
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
base == 4.5.*, monad-control, transformers-base, lifted-base,
- IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance
+ IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process
Other-Modules: Utility.Touch
C-Sources: Utility/libdiskfree.c
Extensions: CPP
+ GHC-Options: -threaded
source-repository head
type: git
diff --git a/test.hs b/test.hs
index 9de73264e..a377057c2 100644
--- a/test.hs
+++ b/test.hs
@@ -14,6 +14,7 @@ import Test.QuickCheck
import System.Posix.Directory (changeWorkingDirectory)
import System.Posix.Files
import System.Posix.Env
+import System.Posix.Process
import Control.Exception.Extensible
import qualified Data.Map as M
import System.IO.HVFS (SystemFS(..))