diff options
9 files changed, 142 insertions, 53 deletions
diff --git a/Annex/ChangedRefs.hs b/Annex/ChangedRefs.hs
new file mode 100644
index 000000000..0dc82d3b3
--- /dev/null
+++ b/Annex/ChangedRefs.hs
@@ -0,0 +1,105 @@
+{- Waiting for changed git refs
+ -
+ - Copyright 2014-216 Joey Hess <>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+module Annex.ChangedRefs
+ ( ChangedRefs(..)
+ , ChangedRefsHandle
+ , waitChangedRefs
+ , drainChangedRefs
+ , stopWatchingChangedRefs
+ , watchChangedRefs
+ ) where
+import Annex.Common
+import Utility.DirWatcher
+import Utility.DirWatcher.Types
+import qualified Git
+import Git.Sha
+import qualified Utility.SimpleProtocol as Proto
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Concurrent.STM.TBMChan
+newtype ChangedRefs = ChangedRefs [Git.Ref]
+ deriving (Show)
+instance Proto.Serializable ChangedRefs where
+ serialize (ChangedRefs l) = unwords $ map Git.fromRef l
+ deserialize = Just . ChangedRefs . map Git.Ref . words
+data ChangedRefsHandle = ChangedRefsHandle DirWatcherHandle (TBMChan Git.Sha)
+-- | Wait for one or more git refs to change.
+-- When possible, coalesce ref writes that occur closely together
+-- in time. Delay up to 0.05 seconds to get more ref writes.
+waitChangedRefs :: ChangedRefsHandle -> IO ChangedRefs
+waitChangedRefs (ChangedRefsHandle _ chan) = do
+ v <- atomically $ readTBMChan chan
+ case v of
+ Nothing -> return $ ChangedRefs []
+ Just r -> do
+ threadDelay 50000
+ rs <- atomically $ loop []
+ return $ ChangedRefs (r:rs)
+ where
+ loop rs = do
+ v <- tryReadTBMChan chan
+ case v of
+ Just (Just r) -> loop (r:rs)
+ _ -> return rs
+-- | Remove any changes that might be buffered in the channel,
+-- without waiting for any new changes.
+drainChangedRefs :: ChangedRefsHandle -> IO ()
+drainChangedRefs (ChangedRefsHandle _ chan) = atomically go
+ where
+ go = do
+ v <- tryReadTBMChan chan
+ case v of
+ Just (Just _) -> go
+ _ -> return ()
+stopWatchingChangedRefs :: ChangedRefsHandle -> IO ()
+stopWatchingChangedRefs h@(ChangedRefsHandle wh chan) = do
+ stopWatchDir wh
+ atomically $ closeTBMChan chan
+ drainChangedRefs h
+watchChangedRefs :: Annex ChangedRefsHandle
+watchChangedRefs = do
+ -- This channel is used to accumulate notifications,
+ -- because the DirWatcher might have multiple threads that find
+ -- changes at the same time. It is bounded to allow a watcher
+ -- to be started once and reused, without too many changes being
+ -- buffered in memory.
+ chan <- liftIO $ newTBMChanIO 100
+ g <- gitRepo
+ let refdir = Git.localGitDir g </> "refs"
+ liftIO $ createDirectoryIfMissing True refdir
+ let notifyhook = Just $ notifyHook chan
+ let hooks = mkWatchHooks
+ { addHook = notifyhook
+ , modifyHook = notifyhook
+ }
+ h <- liftIO $ watchDir refdir (const False) True hooks id
+ return $ ChangedRefsHandle h chan
+notifyHook :: TBMChan Git.Sha -> FilePath -> Maybe FileStatus -> IO ()
+notifyHook chan reffile _
+ | ".lock" `isSuffixOf` reffile = noop
+ | otherwise = void $ do
+ sha <- catchDefaultIO Nothing $
+ extractSha <$> readFile reffile
+ -- When the channel is full, there is probably no reader
+ -- running, or ref changes have been occuring very fast,
+ -- so it's ok to not write the change to it.
+ maybe noop (void . atomically . tryWriteTBMChan chan) sha
diff --git a/Command/NotifyChanges.hs b/Command/NotifyChanges.hs
index bb9b10eee..83d7bca3f 100644
--- a/Command/NotifyChanges.hs
+++ b/Command/NotifyChanges.hs
@@ -8,6 +8,7 @@
module Command.NotifyChanges where
import Command
+import Annex.ChangedRefs
import Utility.DirWatcher
import Utility.DirWatcher.Types
import qualified Git
@@ -30,55 +31,18 @@ seek = withNothing start
start :: CommandStart
start = do
- -- This channel is used to accumulate notifcations,
- -- because the DirWatcher might have multiple threads that find
- -- changes at the same time.
- chan <- liftIO newTChanIO
- g <- gitRepo
- let refdir = Git.localGitDir g </> "refs"
- liftIO $ createDirectoryIfMissing True refdir
+ h <- watchChangedRefs
- let notifyhook = Just $ notifyHook chan
- let hooks = mkWatchHooks
- { addHook = notifyhook
- , modifyHook = notifyhook
- }
- void $ liftIO $ watchDir refdir (const False) True hooks id
- let sender = do
- send READY
- forever $ send . CHANGED =<< drain chan
-- No messages need to be received from the caller,
-- but when it closes the connection, notice and terminate.
let receiver = forever $ void $ getProtocolLine stdin
+ let sender = forever $ send . CHANGED =<< waitChangedRefs h
+ liftIO $ send READY
void $ liftIO $ concurrently sender receiver
+ liftIO $ stopWatchingChangedRefs h
-notifyHook :: TChan Git.Sha -> FilePath -> Maybe FileStatus -> IO ()
-notifyHook chan reffile _
- | ".lock" `isSuffixOf` reffile = noop
- | otherwise = void $ do
- sha <- catchDefaultIO Nothing $
- extractSha <$> readFile reffile
- maybe noop (atomically . writeTChan chan) sha
--- When possible, coalesce ref writes that occur closely together
--- in time. Delay up to 0.05 seconds to get more ref writes.
-drain :: TChan Git.Sha -> IO [Git.Sha]
-drain chan = do
- r <- atomically $ readTChan chan
- threadDelay 50000
- rs <- atomically $ drain' chan
- return (r:rs)
-drain' :: TChan Git.Sha -> STM [Git.Sha]
-drain' chan = loop []
- where
- loop rs = maybe (return rs) (\r -> loop (r:rs)) =<< tryReadTChan chan
send :: Notification -> IO ()
send n = do
putStrLn $ unwords $ formatMessage n
diff --git a/P2P/Annex.hs b/P2P/Annex.hs
index d24e65b0f..e9b59652c 100644
--- a/P2P/Annex.hs
+++ b/P2P/Annex.hs
@@ -16,6 +16,7 @@ module P2P.Annex
import Annex.Common
import Annex.Content
import Annex.Transfer
+import Annex.ChangedRefs
import P2P.Protocol
import P2P.IO
import Logs.Location
@@ -114,6 +115,14 @@ runLocal runmode runner a = case a of
protoaction False
Right _ -> runner next
+ WaitRefChange next -> do
+ v <- tryNonAsync $ bracket
+ watchChangedRefs
+ (liftIO . stopWatchingChangedRefs)
+ (liftIO . waitChangedRefs)
+ case v of
+ Left e -> return (Left (show e))
+ Right changedrefs -> runner (next changedrefs)
transfer mk k af ta = case runmode of
-- Update transfer logs when serving.
diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs
index 03c7c70cf..d8be3ff42 100644
--- a/P2P/Protocol.hs
+++ b/P2P/Protocol.hs
@@ -19,6 +19,7 @@ import Utility.Applicative
import Utility.PartialPrelude
import Utility.Metered
import Git.FilePath
+import Annex.ChangedRefs (ChangedRefs)
import Control.Monad
import Control.Monad.Free
@@ -50,6 +51,8 @@ data Message
| CONNECT Service
+ | CHANGED ChangedRefs
@@ -70,6 +73,8 @@ instance Proto.Sendable Message where
formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service]
formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode]
+ formatMessage (CHANGED refs) = ["CHANGED", Proto.serialize refs]
formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key]
formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key]
@@ -89,6 +94,8 @@ instance Proto.Receivable Message where
parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE
parseCommand "CONNECT" = Proto.parse1 CONNECT
parseCommand "CONNECTDONE" = Proto.parse1 CONNECTDONE
+ parseCommand "NOTIFYCHANGE" = Proto.parse0 NOTIFYCHANGE
+ parseCommand "CHANGED" = Proto.parse1 CHANGED
parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT
parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT
parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT
@@ -227,6 +234,8 @@ data LocalF c
-- from being deleted, while running the provided protocol
-- action. If unable to lock the content, runs the protocol action
-- with False.
+ | WaitRefChange (ChangedRefs -> c)
+ -- ^ Waits for one or more git refs to change and returns them.
deriving (Functor)
type Local = Free LocalF
@@ -379,6 +388,10 @@ serveAuthed myuuid = void $ serverLoop handler
handler (CONNECT service) = do
net $ relayService service
return ServerContinue
+ handler NOTIFYCHANGE = do
+ refs <- local waitRefChange
+ net $ sendMessage (CHANGED refs)
+ return ServerContinue
handler _ = return ServerUnexpected
sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool
diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs
index 205165062..59502f8d3 100644
--- a/RemoteDaemon/Transport/Ssh.hs
+++ b/RemoteDaemon/Transport/Ssh.hs
@@ -17,6 +17,7 @@ import Utility.SimpleProtocol
import qualified Git
import Git.Command
import Utility.ThreadScheduler
+import Annex.ChangedRefs
import Control.Concurrent.STM
import Control.Concurrent.Async
@@ -73,7 +74,7 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
Just SshRemote.READY -> do
send (CONNECTED url)
handlestdout fromh
- Just (SshRemote.CHANGED shas) -> do
+ Just (SshRemote.CHANGED (ChangedRefs shas)) -> do
whenM (checkNewShas transporthandle shas) $
handlestdout fromh
diff --git a/RemoteDaemon/Transport/Ssh/Types.hs b/RemoteDaemon/Transport/Ssh/Types.hs
index fa6a55d3d..606e1a563 100644
--- a/RemoteDaemon/Transport/Ssh/Types.hs
+++ b/RemoteDaemon/Transport/Ssh/Types.hs
@@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types (
) where
import qualified Utility.SimpleProtocol as Proto
-import RemoteDaemon.Types (RefList)
+import Annex.ChangedRefs (ChangedRefs)
data Notification
- | CHANGED RefList
+ | CHANGED ChangedRefs
instance Proto.Sendable Notification where
formatMessage READY = ["READY"]
diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs
index ba88aa685..c0d74e038 100644
--- a/RemoteDaemon/Types.hs
+++ b/RemoteDaemon/Types.hs
@@ -5,7 +5,6 @@
- Licensed under the GNU GPL version 3 or higher.
-{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module RemoteDaemon.Types where
@@ -15,6 +14,7 @@ import qualified Annex
import qualified Git.Types as Git
import qualified Utility.SimpleProtocol as Proto
import Types.GitConfig
+import Annex.ChangedRefs (ChangedRefs)
import Network.URI
import Control.Concurrent
@@ -52,13 +52,11 @@ data Consumed
- | CHANGED RefList
+ | CHANGED ChangedRefs
deriving (Show)
-type RefList = [Git.Ref]
instance Proto.Sendable Emitted where
formatMessage (CONNECTED remote) =
["CONNECTED", Proto.serialize remote]
@@ -100,10 +98,6 @@ instance Proto.Serializable RemoteURI where
serialize (RemoteURI u) = show u
deserialize = RemoteURI <$$> parseURI
-instance Proto.Serializable RefList where
- serialize = unwords . map Git.fromRef
- deserialize = Just . map Git.Ref . words
instance Proto.Serializable Bool where
serialize False = "0"
serialize True = "1"
diff --git a/debian/control b/debian/control
index 1d2313954..f5a9de840 100644
--- a/debian/control
+++ b/debian/control
@@ -50,6 +50,7 @@ Build-Depends:
+ libghc-stm-chans-dev,
diff --git a/git-annex.cabal b/git-annex.cabal
index 465769ea0..ec54a146d 100644
--- a/git-annex.cabal
+++ b/git-annex.cabal
@@ -371,6 +371,7 @@ Executable git-annex
+ stm-chans,
CC-Options: -Wall
GHC-Options: -Wall -fno-warn-tabs
@@ -513,6 +514,7 @@ Executable git-annex
+ Annex.ChangedRefs