diff options
-rw-r--r-- | Annex/ChangedRefs.hs | 105 | ||||
-rw-r--r-- | Command/NotifyChanges.hs | 48 | ||||
-rw-r--r-- | P2P/Annex.hs | 9 | ||||
-rw-r--r-- | P2P/Protocol.hs | 13 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Ssh.hs | 3 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Ssh/Types.hs | 4 | ||||
-rw-r--r-- | RemoteDaemon/Types.hs | 10 | ||||
-rw-r--r-- | debian/control | 1 | ||||
-rw-r--r-- | git-annex.cabal | 2 |
9 files changed, 142 insertions, 53 deletions
diff --git a/Annex/ChangedRefs.hs b/Annex/ChangedRefs.hs new file mode 100644 index 000000000..0dc82d3b3 --- /dev/null +++ b/Annex/ChangedRefs.hs @@ -0,0 +1,105 @@ +{- Waiting for changed git refs + - + - Copyright 2014-216 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Annex.ChangedRefs + ( ChangedRefs(..) + , ChangedRefsHandle + , waitChangedRefs + , drainChangedRefs + , stopWatchingChangedRefs + , watchChangedRefs + ) where + +import Annex.Common +import Utility.DirWatcher +import Utility.DirWatcher.Types +import qualified Git +import Git.Sha +import qualified Utility.SimpleProtocol as Proto + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.STM.TBMChan + +newtype ChangedRefs = ChangedRefs [Git.Ref] + deriving (Show) + +instance Proto.Serializable ChangedRefs where + serialize (ChangedRefs l) = unwords $ map Git.fromRef l + deserialize = Just . ChangedRefs . map Git.Ref . words + +data ChangedRefsHandle = ChangedRefsHandle DirWatcherHandle (TBMChan Git.Sha) + +-- | Wait for one or more git refs to change. +-- +-- When possible, coalesce ref writes that occur closely together +-- in time. Delay up to 0.05 seconds to get more ref writes. +waitChangedRefs :: ChangedRefsHandle -> IO ChangedRefs +waitChangedRefs (ChangedRefsHandle _ chan) = do + v <- atomically $ readTBMChan chan + case v of + Nothing -> return $ ChangedRefs [] + Just r -> do + threadDelay 50000 + rs <- atomically $ loop [] + return $ ChangedRefs (r:rs) + where + loop rs = do + v <- tryReadTBMChan chan + case v of + Just (Just r) -> loop (r:rs) + _ -> return rs + +-- | Remove any changes that might be buffered in the channel, +-- without waiting for any new changes. +drainChangedRefs :: ChangedRefsHandle -> IO () +drainChangedRefs (ChangedRefsHandle _ chan) = atomically go + where + go = do + v <- tryReadTBMChan chan + case v of + Just (Just _) -> go + _ -> return () + +stopWatchingChangedRefs :: ChangedRefsHandle -> IO () +stopWatchingChangedRefs h@(ChangedRefsHandle wh chan) = do + stopWatchDir wh + atomically $ closeTBMChan chan + drainChangedRefs h + +watchChangedRefs :: Annex ChangedRefsHandle +watchChangedRefs = do + -- This channel is used to accumulate notifications, + -- because the DirWatcher might have multiple threads that find + -- changes at the same time. It is bounded to allow a watcher + -- to be started once and reused, without too many changes being + -- buffered in memory. + chan <- liftIO $ newTBMChanIO 100 + + g <- gitRepo + let refdir = Git.localGitDir g </> "refs" + liftIO $ createDirectoryIfMissing True refdir + + let notifyhook = Just $ notifyHook chan + let hooks = mkWatchHooks + { addHook = notifyhook + , modifyHook = notifyhook + } + + h <- liftIO $ watchDir refdir (const False) True hooks id + return $ ChangedRefsHandle h chan + +notifyHook :: TBMChan Git.Sha -> FilePath -> Maybe FileStatus -> IO () +notifyHook chan reffile _ + | ".lock" `isSuffixOf` reffile = noop + | otherwise = void $ do + sha <- catchDefaultIO Nothing $ + extractSha <$> readFile reffile + -- When the channel is full, there is probably no reader + -- running, or ref changes have been occuring very fast, + -- so it's ok to not write the change to it. + maybe noop (void . atomically . tryWriteTBMChan chan) sha diff --git a/Command/NotifyChanges.hs b/Command/NotifyChanges.hs index bb9b10eee..83d7bca3f 100644 --- a/Command/NotifyChanges.hs +++ b/Command/NotifyChanges.hs @@ -8,6 +8,7 @@ module Command.NotifyChanges where import Command +import Annex.ChangedRefs import Utility.DirWatcher import Utility.DirWatcher.Types import qualified Git @@ -30,55 +31,18 @@ seek = withNothing start start :: CommandStart start = do - -- This channel is used to accumulate notifcations, - -- because the DirWatcher might have multiple threads that find - -- changes at the same time. - chan <- liftIO newTChanIO - - g <- gitRepo - let refdir = Git.localGitDir g </> "refs" - liftIO $ createDirectoryIfMissing True refdir + h <- watchChangedRefs - let notifyhook = Just $ notifyHook chan - let hooks = mkWatchHooks - { addHook = notifyhook - , modifyHook = notifyhook - } - - void $ liftIO $ watchDir refdir (const False) True hooks id - - let sender = do - send READY - forever $ send . CHANGED =<< drain chan - -- No messages need to be received from the caller, -- but when it closes the connection, notice and terminate. let receiver = forever $ void $ getProtocolLine stdin + let sender = forever $ send . CHANGED =<< waitChangedRefs h + + liftIO $ send READY void $ liftIO $ concurrently sender receiver + liftIO $ stopWatchingChangedRefs h stop -notifyHook :: TChan Git.Sha -> FilePath -> Maybe FileStatus -> IO () -notifyHook chan reffile _ - | ".lock" `isSuffixOf` reffile = noop - | otherwise = void $ do - sha <- catchDefaultIO Nothing $ - extractSha <$> readFile reffile - maybe noop (atomically . writeTChan chan) sha - --- When possible, coalesce ref writes that occur closely together --- in time. Delay up to 0.05 seconds to get more ref writes. -drain :: TChan Git.Sha -> IO [Git.Sha] -drain chan = do - r <- atomically $ readTChan chan - threadDelay 50000 - rs <- atomically $ drain' chan - return (r:rs) - -drain' :: TChan Git.Sha -> STM [Git.Sha] -drain' chan = loop [] - where - loop rs = maybe (return rs) (\r -> loop (r:rs)) =<< tryReadTChan chan - send :: Notification -> IO () send n = do putStrLn $ unwords $ formatMessage n diff --git a/P2P/Annex.hs b/P2P/Annex.hs index d24e65b0f..e9b59652c 100644 --- a/P2P/Annex.hs +++ b/P2P/Annex.hs @@ -16,6 +16,7 @@ module P2P.Annex import Annex.Common import Annex.Content import Annex.Transfer +import Annex.ChangedRefs import P2P.Protocol import P2P.IO import Logs.Location @@ -114,6 +115,14 @@ runLocal runmode runner a = case a of protoaction False next Right _ -> runner next + WaitRefChange next -> do + v <- tryNonAsync $ bracket + watchChangedRefs + (liftIO . stopWatchingChangedRefs) + (liftIO . waitChangedRefs) + case v of + Left e -> return (Left (show e)) + Right changedrefs -> runner (next changedrefs) where transfer mk k af ta = case runmode of -- Update transfer logs when serving. diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 03c7c70cf..d8be3ff42 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -19,6 +19,7 @@ import Utility.Applicative import Utility.PartialPrelude import Utility.Metered import Git.FilePath +import Annex.ChangedRefs (ChangedRefs) import Control.Monad import Control.Monad.Free @@ -50,6 +51,8 @@ data Message | AUTH_FAILURE | CONNECT Service | CONNECTDONE ExitCode + | NOTIFYCHANGE + | CHANGED ChangedRefs | CHECKPRESENT Key | LOCKCONTENT Key | UNLOCKCONTENT @@ -70,6 +73,8 @@ instance Proto.Sendable Message where formatMessage AUTH_FAILURE = ["AUTH-FAILURE"] formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service] formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode] + formatMessage NOTIFYCHANGE = ["NOTIFYCHANGE"] + formatMessage (CHANGED refs) = ["CHANGED", Proto.serialize refs] formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key] formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key] formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"] @@ -89,6 +94,8 @@ instance Proto.Receivable Message where parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE parseCommand "CONNECT" = Proto.parse1 CONNECT parseCommand "CONNECTDONE" = Proto.parse1 CONNECTDONE + parseCommand "NOTIFYCHANGE" = Proto.parse0 NOTIFYCHANGE + parseCommand "CHANGED" = Proto.parse1 CHANGED parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT @@ -227,6 +234,8 @@ data LocalF c -- from being deleted, while running the provided protocol -- action. If unable to lock the content, runs the protocol action -- with False. + | WaitRefChange (ChangedRefs -> c) + -- ^ Waits for one or more git refs to change and returns them. deriving (Functor) type Local = Free LocalF @@ -379,6 +388,10 @@ serveAuthed myuuid = void $ serverLoop handler handler (CONNECT service) = do net $ relayService service return ServerContinue + handler NOTIFYCHANGE = do + refs <- local waitRefChange + net $ sendMessage (CHANGED refs) + return ServerContinue handler _ = return ServerUnexpected sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 205165062..59502f8d3 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -17,6 +17,7 @@ import Utility.SimpleProtocol import qualified Git import Git.Command import Utility.ThreadScheduler +import Annex.ChangedRefs import Control.Concurrent.STM import Control.Concurrent.Async @@ -73,7 +74,7 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = Just SshRemote.READY -> do send (CONNECTED url) handlestdout fromh - Just (SshRemote.CHANGED shas) -> do + Just (SshRemote.CHANGED (ChangedRefs shas)) -> do whenM (checkNewShas transporthandle shas) $ fetch handlestdout fromh diff --git a/RemoteDaemon/Transport/Ssh/Types.hs b/RemoteDaemon/Transport/Ssh/Types.hs index fa6a55d3d..606e1a563 100644 --- a/RemoteDaemon/Transport/Ssh/Types.hs +++ b/RemoteDaemon/Transport/Ssh/Types.hs @@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types ( ) where import qualified Utility.SimpleProtocol as Proto -import RemoteDaemon.Types (RefList) +import Annex.ChangedRefs (ChangedRefs) data Notification = READY - | CHANGED RefList + | CHANGED ChangedRefs instance Proto.Sendable Notification where formatMessage READY = ["READY"] diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index ba88aa685..c0d74e038 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -5,7 +5,6 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module RemoteDaemon.Types where @@ -15,6 +14,7 @@ import qualified Annex import qualified Git.Types as Git import qualified Utility.SimpleProtocol as Proto import Types.GitConfig +import Annex.ChangedRefs (ChangedRefs) import Network.URI import Control.Concurrent @@ -52,13 +52,11 @@ data Consumed = PAUSE | LOSTNET | RESUME - | CHANGED RefList + | CHANGED ChangedRefs | RELOAD | STOP deriving (Show) -type RefList = [Git.Ref] - instance Proto.Sendable Emitted where formatMessage (CONNECTED remote) = ["CONNECTED", Proto.serialize remote] @@ -100,10 +98,6 @@ instance Proto.Serializable RemoteURI where serialize (RemoteURI u) = show u deserialize = RemoteURI <$$> parseURI -instance Proto.Serializable RefList where - serialize = unwords . map Git.fromRef - deserialize = Just . map Git.Ref . words - instance Proto.Serializable Bool where serialize False = "0" serialize True = "1" diff --git a/debian/control b/debian/control index 1d2313954..f5a9de840 100644 --- a/debian/control +++ b/debian/control @@ -50,6 +50,7 @@ Build-Depends: libghc-esqueleto-dev, libghc-securemem-dev, libghc-byteable-dev, + libghc-stm-chans-dev, libghc-dns-dev, libghc-case-insensitive-dev, libghc-http-types-dev, diff --git a/git-annex.cabal b/git-annex.cabal index 465769ea0..ec54a146d 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -371,6 +371,7 @@ Executable git-annex regex-tdfa, socks, byteable, + stm-chans, securemem CC-Options: -Wall GHC-Options: -Wall -fno-warn-tabs @@ -513,6 +514,7 @@ Executable git-annex Annex.Branch.Transitions Annex.BranchState Annex.CatFile + Annex.ChangedRefs Annex.CheckAttr Annex.CheckIgnore Annex.Common |