aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-06-22 13:39:44 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-06-22 14:10:25 -0400
commit28e28bc0436cb0a33e570b1a1f678e80a770a21a (patch)
treed9acb7b66a19a64d6108f980c081f2537c9af353
parent3ee44cf8feb11fc439c02eb0eb8f12d290b01120 (diff)
stub syncer thread and commit channel
-rw-r--r--Assistant.hs17
-rw-r--r--Assistant/Changes.hs25
-rw-r--r--Assistant/Commits.hs32
-rw-r--r--Assistant/Committer.hs6
-rw-r--r--Assistant/Syncer.hs29
-rw-r--r--Assistant/ThreadedMonad.hs3
-rw-r--r--Assistant/Watcher.hs4
-rw-r--r--Utility/TSet.hs39
8 files changed, 125 insertions, 30 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 33c7cef36..5a3fa5a9d 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -22,9 +22,11 @@
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- index, then commits.
- - Thread 6: status logger
+ - Thread 6: syncer
+ - Waits for commits to be made, and syncs the git repo to remotes.
+ - Thread 7: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 7: sanity checker
+ - Thread 8: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
@@ -47,8 +49,10 @@ import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
+import Assistant.Commits
import Assistant.Watcher
import Assistant.Committer
+import Assistant.Syncer
import Assistant.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
@@ -70,12 +74,9 @@ startDaemon assistant foreground
dstatus <- startDaemonStatus
liftIO $ a $ do
changechan <- newChangeChan
- -- The commit thread is started early,
- -- so that the user can immediately
- -- begin adding files and having them
- -- committed, even while the startup scan
- -- is taking place.
- _ <- forkIO $ commitThread st changechan
+ commitchan <- newCommitChan
+ _ <- forkIO $ commitThread st changechan commitchan
+ _ <- forkIO $ syncThread st commitchan
_ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan
-- Does not return.
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 173ba1922..47eae83ef 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -8,14 +8,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
+import Utility.TSet
-import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
-type ChangeChan = TChan Change
+type ChangeChan = TSet Change
data Change
= Change
@@ -29,11 +29,8 @@ data Change
}
deriving (Show)
-runChangeChan :: STM a -> IO a
-runChangeChan = atomically
-
newChangeChan :: IO ChangeChan
-newChangeChan = atomically newTChan
+newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@@ -65,17 +62,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = runChangeChan $ do
- c <- readTChan chan
- go [c]
- where
- go l = do
- v <- tryReadTChan chan
- case v of
- Nothing -> return l
- Just c -> go (c:l)
+getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+refillChanges = putTSet
+
+{- Records a change in the channel. -}
+recordChange :: ChangeChan -> Change -> IO ()
+recordChange = putTSet1
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
new file mode 100644
index 000000000..152544e7c
--- /dev/null
+++ b/Assistant/Commits.hs
@@ -0,0 +1,32 @@
+{- git-annex assistant commit tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.Commits where
+
+import Utility.TSet
+
+import Data.Time.Clock
+
+type CommitChan = TSet Commit
+
+data Commit = Commit UTCTime
+ deriving (Show)
+
+newCommitChan :: IO CommitChan
+newCommitChan = newTSet
+
+{- Gets all unhandled commits.
+ - Blocks until at least one commit is made. -}
+getCommits :: CommitChan -> IO [Commit]
+getCommits = getTSet
+
+{- Puts unhandled commits back into the channel.
+ - Note: Original order is not preserved. -}
+refillCommits :: CommitChan -> [Commit] -> IO ()
+refillCommits = putTSet
+
+{- Records a commit in the channel. -}
+recordCommit :: CommitChan -> Commit -> IO ()
+recordCommit = putTSet1
diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs
index 63df8cafc..acdee1408 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Committer.hs
@@ -7,6 +7,7 @@ module Assistant.Committer where
import Common.Annex
import Assistant.Changes
+import Assistant.Commits
import Assistant.ThreadedMonad
import Assistant.Watcher
import qualified Annex
@@ -26,8 +27,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> IO ()
-commitThread st changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
+commitThread st changechan commitchan = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -40,6 +41,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
+ recordCommit commitchan (Commit time)
else refillChanges changechan readychanges
else refillChanges changechan changes
diff --git a/Assistant/Syncer.hs b/Assistant/Syncer.hs
new file mode 100644
index 000000000..059859c07
--- /dev/null
+++ b/Assistant/Syncer.hs
@@ -0,0 +1,29 @@
+{- git-annex assistant git syncing thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.Syncer where
+
+import Assistant.Commits
+import Assistant.ThreadedMonad
+import qualified Command.Sync
+import Utility.ThreadScheduler
+
+{- This thread syncs git commits out to remotes. -}
+syncThread :: ThreadState -> CommitChan -> IO ()
+syncThread st commitchan = runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to sync.
+ if shouldSync commits
+ then syncToRemotes
+ else refillCommits commitchan commits
+
+{- Decide if now is a good time to sync commits to remotes. -}
+shouldSync :: [Commit] -> Bool
+shouldSync commits = not (null commits)
+
+syncToRemotes :: IO ()
+syncToRemotes = return () -- TOOD
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
index 51f579d07..91a311dee 100644
--- a/Assistant/ThreadedMonad.hs
+++ b/Assistant/ThreadedMonad.hs
@@ -32,7 +32,8 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- - This serializes calls by threads. -}
+ - This serializes calls by threads; only one thread can run in Annex at a
+ - time. -}
runThreadState :: ThreadState -> Annex a -> IO a
runThreadState mvar a = do
startstate <- takeMVar mvar
diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs
index db58f01e8..78330c8d0 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Watcher.hs
@@ -27,7 +27,6 @@ import Annex.Content
import Annex.CatFile
import Git.Types
-import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@@ -96,8 +95,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do
case r of
Left e -> print e
Right Nothing -> noop
- Right (Just change) -> void $
- runChangeChan $ writeTChan changechan change
+ Right (Just change) -> recordChange changechan change
where
go = runThreadState st $ handler file filestatus dstatus
diff --git a/Utility/TSet.hs b/Utility/TSet.hs
new file mode 100644
index 000000000..24d345477
--- /dev/null
+++ b/Utility/TSet.hs
@@ -0,0 +1,39 @@
+{- Transactional sets
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Utility.TSet where
+
+import Common
+
+import Control.Concurrent.STM
+
+type TSet = TChan
+
+runTSet :: STM a -> IO a
+runTSet = atomically
+
+newTSet :: IO (TSet a)
+newTSet = atomically newTChan
+
+{- Gets the contents of the TSet. Blocks until at least one item is
+ - present. -}
+getTSet :: TSet a -> IO [a]
+getTSet tset = runTSet $ do
+ c <- readTChan tset
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan tset
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts items into a TSet. -}
+putTSet :: TSet a -> [a] -> IO ()
+putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs
+
+{- Put a single item into a TSet. -}
+putTSet1 :: TSet a -> a -> IO ()
+putTSet1 tset v = void $ runTSet $ writeTChan tset v