diff options
author | Joey Hess <joey@kitenet.net> | 2012-06-22 13:39:44 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-06-22 14:10:25 -0400 |
commit | 28e28bc0436cb0a33e570b1a1f678e80a770a21a (patch) | |
tree | d9acb7b66a19a64d6108f980c081f2537c9af353 | |
parent | 3ee44cf8feb11fc439c02eb0eb8f12d290b01120 (diff) |
stub syncer thread and commit channel
-rw-r--r-- | Assistant.hs | 17 | ||||
-rw-r--r-- | Assistant/Changes.hs | 25 | ||||
-rw-r--r-- | Assistant/Commits.hs | 32 | ||||
-rw-r--r-- | Assistant/Committer.hs | 6 | ||||
-rw-r--r-- | Assistant/Syncer.hs | 29 | ||||
-rw-r--r-- | Assistant/ThreadedMonad.hs | 3 | ||||
-rw-r--r-- | Assistant/Watcher.hs | 4 | ||||
-rw-r--r-- | Utility/TSet.hs | 39 |
8 files changed, 125 insertions, 30 deletions
diff --git a/Assistant.hs b/Assistant.hs index 33c7cef36..5a3fa5a9d 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -22,9 +22,11 @@ - Thread 5: committer - Waits for changes to occur, and runs the git queue to update its - index, then commits. - - Thread 6: status logger + - Thread 6: syncer + - Waits for commits to be made, and syncs the git repo to remotes. + - Thread 7: status logger - Wakes up periodically and records the daemon's status to disk. - - Thread 7: sanity checker + - Thread 8: sanity checker - Wakes up periodically (rarely) and does sanity checks. - - ThreadState: (MVar) @@ -47,8 +49,10 @@ import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes +import Assistant.Commits import Assistant.Watcher import Assistant.Committer +import Assistant.Syncer import Assistant.SanityChecker import qualified Utility.Daemon import Utility.LogFile @@ -70,12 +74,9 @@ startDaemon assistant foreground dstatus <- startDaemonStatus liftIO $ a $ do changechan <- newChangeChan - -- The commit thread is started early, - -- so that the user can immediately - -- begin adding files and having them - -- committed, even while the startup scan - -- is taking place. - _ <- forkIO $ commitThread st changechan + commitchan <- newCommitChan + _ <- forkIO $ commitThread st changechan commitchan + _ <- forkIO $ syncThread st commitchan _ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ sanityCheckerThread st dstatus changechan -- Does not return. diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index 173ba1922..47eae83ef 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -8,14 +8,14 @@ module Assistant.Changes where import Common.Annex import qualified Annex.Queue import Types.KeySource +import Utility.TSet -import Control.Concurrent.STM import Data.Time.Clock data ChangeType = AddChange | LinkChange | RmChange | RmDirChange deriving (Show, Eq) -type ChangeChan = TChan Change +type ChangeChan = TSet Change data Change = Change @@ -29,11 +29,8 @@ data Change } deriving (Show) -runChangeChan :: STM a -> IO a -runChangeChan = atomically - newChangeChan :: IO ChangeChan -newChangeChan = atomically newTChan +newChangeChan = newTSet {- Handlers call this when they made a change that needs to get committed. -} madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) @@ -65,17 +62,13 @@ finishedChange c = c {- Gets all unhandled changes. - Blocks until at least one change is made. -} getChanges :: ChangeChan -> IO [Change] -getChanges chan = runChangeChan $ do - c <- readTChan chan - go [c] - where - go l = do - v <- tryReadTChan chan - case v of - Nothing -> return l - Just c -> go (c:l) +getChanges = getTSet {- Puts unhandled changes back into the channel. - Note: Original order is not preserved. -} refillChanges :: ChangeChan -> [Change] -> IO () -refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs +refillChanges = putTSet + +{- Records a change in the channel. -} +recordChange :: ChangeChan -> Change -> IO () +recordChange = putTSet1 diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs new file mode 100644 index 000000000..152544e7c --- /dev/null +++ b/Assistant/Commits.hs @@ -0,0 +1,32 @@ +{- git-annex assistant commit tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Assistant.Commits where + +import Utility.TSet + +import Data.Time.Clock + +type CommitChan = TSet Commit + +data Commit = Commit UTCTime + deriving (Show) + +newCommitChan :: IO CommitChan +newCommitChan = newTSet + +{- Gets all unhandled commits. + - Blocks until at least one commit is made. -} +getCommits :: CommitChan -> IO [Commit] +getCommits = getTSet + +{- Puts unhandled commits back into the channel. + - Note: Original order is not preserved. -} +refillCommits :: CommitChan -> [Commit] -> IO () +refillCommits = putTSet + +{- Records a commit in the channel. -} +recordCommit :: CommitChan -> Commit -> IO () +recordCommit = putTSet1 diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index 63df8cafc..acdee1408 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -7,6 +7,7 @@ module Assistant.Committer where import Common.Annex import Assistant.Changes +import Assistant.Commits import Assistant.ThreadedMonad import Assistant.Watcher import qualified Annex @@ -26,8 +27,8 @@ import qualified Data.Set as S import Data.Either {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> IO () -commitThread st changechan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO () +commitThread st changechan commitchan = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. @@ -40,6 +41,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do if shouldCommit time readychanges then do void $ tryIO $ runThreadState st commitStaged + recordCommit commitchan (Commit time) else refillChanges changechan readychanges else refillChanges changechan changes diff --git a/Assistant/Syncer.hs b/Assistant/Syncer.hs new file mode 100644 index 000000000..059859c07 --- /dev/null +++ b/Assistant/Syncer.hs @@ -0,0 +1,29 @@ +{- git-annex assistant git syncing thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Assistant.Syncer where + +import Assistant.Commits +import Assistant.ThreadedMonad +import qualified Command.Sync +import Utility.ThreadScheduler + +{- This thread syncs git commits out to remotes. -} +syncThread :: ThreadState -> CommitChan -> IO () +syncThread st commitchan = runEvery (Seconds 2) $ do + -- We already waited two seconds as a simple rate limiter. + -- Next, wait until at least one commit has been made + commits <- getCommits commitchan + -- Now see if now's a good time to sync. + if shouldSync commits + then syncToRemotes + else refillCommits commitchan commits + +{- Decide if now is a good time to sync commits to remotes. -} +shouldSync :: [Commit] -> Bool +shouldSync commits = not (null commits) + +syncToRemotes :: IO () +syncToRemotes = return () -- TOOD diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs index 51f579d07..91a311dee 100644 --- a/Assistant/ThreadedMonad.hs +++ b/Assistant/ThreadedMonad.hs @@ -32,7 +32,8 @@ withThreadState a = do {- Runs an Annex action, using the state from the MVar. - - - This serializes calls by threads. -} + - This serializes calls by threads; only one thread can run in Annex at a + - time. -} runThreadState :: ThreadState -> Annex a -> IO a runThreadState mvar a = do startstate <- takeMVar mvar diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index db58f01e8..78330c8d0 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -27,7 +27,6 @@ import Annex.Content import Annex.CatFile import Git.Types -import Control.Concurrent.STM import Data.Bits.Utils import qualified Data.ByteString.Lazy as L @@ -96,8 +95,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do case r of Left e -> print e Right Nothing -> noop - Right (Just change) -> void $ - runChangeChan $ writeTChan changechan change + Right (Just change) -> recordChange changechan change where go = runThreadState st $ handler file filestatus dstatus diff --git a/Utility/TSet.hs b/Utility/TSet.hs new file mode 100644 index 000000000..24d345477 --- /dev/null +++ b/Utility/TSet.hs @@ -0,0 +1,39 @@ +{- Transactional sets + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Utility.TSet where + +import Common + +import Control.Concurrent.STM + +type TSet = TChan + +runTSet :: STM a -> IO a +runTSet = atomically + +newTSet :: IO (TSet a) +newTSet = atomically newTChan + +{- Gets the contents of the TSet. Blocks until at least one item is + - present. -} +getTSet :: TSet a -> IO [a] +getTSet tset = runTSet $ do + c <- readTChan tset + go [c] + where + go l = do + v <- tryReadTChan tset + case v of + Nothing -> return l + Just c -> go (c:l) + +{- Puts items into a TSet. -} +putTSet :: TSet a -> [a] -> IO () +putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs + +{- Put a single item into a TSet. -} +putTSet1 :: TSet a -> a -> IO () +putTSet1 tset v = void $ runTSet $ writeTChan tset v |