summaryrefslogtreecommitdiff
path: root/Assistant/Threads/Committer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/Threads/Committer.hs')
-rw-r--r--Assistant/Threads/Committer.hs93
1 files changed, 73 insertions, 20 deletions
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
index c07109489..1064f371a 100644
--- a/Assistant/Threads/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -52,7 +52,7 @@ commitThread = namedThread "Committer" $ do
=<< annexDelayAdd <$> Annex.getGitConfig
waitChangeTime $ \(changes, time) -> do
readychanges <- handleAdds delayadd changes
- if shouldCommit time readychanges
+ if shouldCommit time (length readychanges) readychanges
then do
debug
[ "committing"
@@ -62,8 +62,12 @@ commitThread = namedThread "Committer" $ do
void $ alertWhile commitAlert $
liftAnnex commitStaged
recordCommit
+ let numchanges = length readychanges
mapM_ checkChangeContent readychanges
- else refill readychanges
+ return numchanges
+ else do
+ refill readychanges
+ return 0
refill :: [Change] -> Assistant ()
refill [] = noop
@@ -72,21 +76,33 @@ refill cs = do
refillChanges cs
{- Wait for one or more changes to arrive to be committed. -}
-waitChangeTime :: (([Change], UTCTime) -> Assistant ()) -> Assistant ()
-waitChangeTime a = runEvery (Seconds 1) <~> do
- -- We already waited one second as a simple rate limiter.
- -- Next, wait until at least one change is available for
- -- processing.
- changes <- getChanges
- -- See if now's a good time to commit.
- now <- liftIO getCurrentTime
- case (shouldCommit now changes, possiblyrename changes) of
- (True, False) -> a (changes, now)
- (True, True) -> do
- morechanges <- getrelatedchanges changes
- a (changes ++ morechanges, now)
- _ -> refill changes
+waitChangeTime :: (([Change], UTCTime) -> Assistant Int) -> Assistant ()
+waitChangeTime a = go [] 0
where
+ go unhandled lastcommitsize = do
+ -- Wait one one second as a simple rate limiter.
+ liftIO $ threadDelaySeconds (Seconds 1)
+ -- Now, wait until at least one change is available for
+ -- processing.
+ cs <- getChanges
+ let changes = unhandled ++ cs
+ let len = length changes
+ -- See if now's a good time to commit.
+ now <- liftIO getCurrentTime
+ case (lastcommitsize >= maxCommitSize, shouldCommit now len changes, possiblyrename changes) of
+ (True, True, _)
+ | len > maxCommitSize ->
+ go [] =<< a (changes, now)
+ | otherwise -> aftermaxcommit changes
+ (_, True, False) ->
+ go [] =<< a (changes, now)
+ (_, True, True) -> do
+ morechanges <- getrelatedchanges changes
+ go [] =<< a (changes ++ morechanges, now)
+ _ -> do
+ refill changes
+ go [] lastcommitsize
+
{- Did we perhaps only get one of the AddChange and RmChange pair
- that make up a file rename? Or some of the pairs that make up
- a directory rename?
@@ -116,6 +132,41 @@ waitChangeTime a = runEvery (Seconds 1) <~> do
then return cs
else getbatchchanges (cs':cs)
+ {- The last commit was maximum size, so it's very likely there
+ - are more changes and we'd like to ensure we make another commit
+ - of maximum size if possible.
+ -
+ - But, it can take a while for the Watcher to wake back up
+ - after a commit. It can get blocked by another thread
+ - that is using the Annex state, such as a git-annex branch
+ - commit. Especially after such a large commit, this can
+ - take several seconds. When this happens, it defeats the
+ - normal commit batching, which sees some old changes the
+ - Watcher found while the commit was being prepared, and sees
+ - no recent ones, and wants to commit immediately.
+ -
+ - All that we need to do, then, is wait for the Watcher to
+ - wake up, and queue up one more change.
+ -
+ - However, it's also possible that we're at the end of changes for
+ - now. So to avoid waiting a really long time before committing
+ - those changes we have, poll for up to 30 seconds, and then
+ - commit them.
+ -
+ - Also, try to run something in Annex, to ensure we block
+ - longer if the Annex state is indeed blocked.
+ -}
+ aftermaxcommit oldchanges = loop (30 :: Int)
+ where
+ loop 0 = go oldchanges 0
+ loop n = do
+ liftAnnex noop -- ensure Annex state is free
+ liftIO $ threadDelaySeconds (Seconds 1)
+ changes <- getAnyChanges
+ if null changes
+ then loop (n - 1)
+ else go (oldchanges ++ changes) 0
+
isRmChange :: Change -> Bool
isRmChange (Change { changeInfo = i }) | i == RmChange = True
isRmChange _ = False
@@ -131,20 +182,22 @@ humanImperceptibleDelay :: IO ()
humanImperceptibleDelay = threadDelay $
truncate $ humanImperceptibleInterval * fromIntegral oneSecond
+maxCommitSize :: Int
+maxCommitSize = 5000
+
{- Decide if now is a good time to make a commit.
- Note that the list of changes has an undefined order.
-
- Current strategy: If there have been 10 changes within the past second,
- a batch activity is taking place, so wait for later.
-}
-shouldCommit :: UTCTime -> [Change] -> Bool
-shouldCommit now changes
+shouldCommit :: UTCTime -> Int -> [Change] -> Bool
+shouldCommit now len changes
| len == 0 = False
- | len > 5000 = True -- avoid bloating change pool too much
+ | len >= maxCommitSize = True
| length recentchanges < 10 = True
| otherwise = False -- batch activity
where
- len = length changes
thissecond c = timeDelta c <= 1
recentchanges = filter thissecond changes
timeDelta c = now `diffUTCTime` changeTime c