diff options
Diffstat (limited to 'Assistant/Threads/Committer.hs')
-rw-r--r-- | Assistant/Threads/Committer.hs | 93 |
1 files changed, 73 insertions, 20 deletions
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index c07109489..1064f371a 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -52,7 +52,7 @@ commitThread = namedThread "Committer" $ do =<< annexDelayAdd <$> Annex.getGitConfig waitChangeTime $ \(changes, time) -> do readychanges <- handleAdds delayadd changes - if shouldCommit time readychanges + if shouldCommit time (length readychanges) readychanges then do debug [ "committing" @@ -62,8 +62,12 @@ commitThread = namedThread "Committer" $ do void $ alertWhile commitAlert $ liftAnnex commitStaged recordCommit + let numchanges = length readychanges mapM_ checkChangeContent readychanges - else refill readychanges + return numchanges + else do + refill readychanges + return 0 refill :: [Change] -> Assistant () refill [] = noop @@ -72,21 +76,33 @@ refill cs = do refillChanges cs {- Wait for one or more changes to arrive to be committed. -} -waitChangeTime :: (([Change], UTCTime) -> Assistant ()) -> Assistant () -waitChangeTime a = runEvery (Seconds 1) <~> do - -- We already waited one second as a simple rate limiter. - -- Next, wait until at least one change is available for - -- processing. - changes <- getChanges - -- See if now's a good time to commit. - now <- liftIO getCurrentTime - case (shouldCommit now changes, possiblyrename changes) of - (True, False) -> a (changes, now) - (True, True) -> do - morechanges <- getrelatedchanges changes - a (changes ++ morechanges, now) - _ -> refill changes +waitChangeTime :: (([Change], UTCTime) -> Assistant Int) -> Assistant () +waitChangeTime a = go [] 0 where + go unhandled lastcommitsize = do + -- Wait one one second as a simple rate limiter. + liftIO $ threadDelaySeconds (Seconds 1) + -- Now, wait until at least one change is available for + -- processing. + cs <- getChanges + let changes = unhandled ++ cs + let len = length changes + -- See if now's a good time to commit. + now <- liftIO getCurrentTime + case (lastcommitsize >= maxCommitSize, shouldCommit now len changes, possiblyrename changes) of + (True, True, _) + | len > maxCommitSize -> + go [] =<< a (changes, now) + | otherwise -> aftermaxcommit changes + (_, True, False) -> + go [] =<< a (changes, now) + (_, True, True) -> do + morechanges <- getrelatedchanges changes + go [] =<< a (changes ++ morechanges, now) + _ -> do + refill changes + go [] lastcommitsize + {- Did we perhaps only get one of the AddChange and RmChange pair - that make up a file rename? Or some of the pairs that make up - a directory rename? @@ -116,6 +132,41 @@ waitChangeTime a = runEvery (Seconds 1) <~> do then return cs else getbatchchanges (cs':cs) + {- The last commit was maximum size, so it's very likely there + - are more changes and we'd like to ensure we make another commit + - of maximum size if possible. + - + - But, it can take a while for the Watcher to wake back up + - after a commit. It can get blocked by another thread + - that is using the Annex state, such as a git-annex branch + - commit. Especially after such a large commit, this can + - take several seconds. When this happens, it defeats the + - normal commit batching, which sees some old changes the + - Watcher found while the commit was being prepared, and sees + - no recent ones, and wants to commit immediately. + - + - All that we need to do, then, is wait for the Watcher to + - wake up, and queue up one more change. + - + - However, it's also possible that we're at the end of changes for + - now. So to avoid waiting a really long time before committing + - those changes we have, poll for up to 30 seconds, and then + - commit them. + - + - Also, try to run something in Annex, to ensure we block + - longer if the Annex state is indeed blocked. + -} + aftermaxcommit oldchanges = loop (30 :: Int) + where + loop 0 = go oldchanges 0 + loop n = do + liftAnnex noop -- ensure Annex state is free + liftIO $ threadDelaySeconds (Seconds 1) + changes <- getAnyChanges + if null changes + then loop (n - 1) + else go (oldchanges ++ changes) 0 + isRmChange :: Change -> Bool isRmChange (Change { changeInfo = i }) | i == RmChange = True isRmChange _ = False @@ -131,20 +182,22 @@ humanImperceptibleDelay :: IO () humanImperceptibleDelay = threadDelay $ truncate $ humanImperceptibleInterval * fromIntegral oneSecond +maxCommitSize :: Int +maxCommitSize = 5000 + {- Decide if now is a good time to make a commit. - Note that the list of changes has an undefined order. - - Current strategy: If there have been 10 changes within the past second, - a batch activity is taking place, so wait for later. -} -shouldCommit :: UTCTime -> [Change] -> Bool -shouldCommit now changes +shouldCommit :: UTCTime -> Int -> [Change] -> Bool +shouldCommit now len changes | len == 0 = False - | len > 5000 = True -- avoid bloating change pool too much + | len >= maxCommitSize = True | length recentchanges < 10 = True | otherwise = False -- batch activity where - len = length changes thissecond c = timeDelta c <= 1 recentchanges = filter thissecond changes timeDelta c = now `diffUTCTime` changeTime c |