diff options
Diffstat (limited to 'RemoteDaemon/Core.hs')
-rw-r--r-- | RemoteDaemon/Core.hs | 37 |
1 files changed, 22 insertions, 15 deletions
diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 60a4d5ceb..ed79c0195 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -20,24 +20,25 @@ import Utility.SimpleProtocol import Config import Annex.Ssh -import Control.Concurrent.Async import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM import Network.URI import qualified Data.Map as M runForeground :: IO () runForeground = do (readh, writeh) <- ioHandles - ichan <- newChan :: IO (Chan Consumed) - ochan <- newChan :: IO (Chan Emitted) + ichan <- newTChanIO :: IO (TChan Consumed) + ochan <- newTChanIO :: IO (TChan Emitted) let reader = forever $ do l <- hGetLine readh case parseMessage l of Nothing -> error $ "protocol error: " ++ l - Just cmd -> writeChan ichan cmd + Just cmd -> atomically $ writeTChan ichan cmd let writer = forever $ do - msg <- readChan ochan + msg <- atomically $ readTChan ochan hPutStrLn writeh $ unwords $ formatMessage msg hFlush writeh let controller = runController ichan ochan @@ -46,11 +47,11 @@ runForeground = do void $ tryIO $ reader `concurrently` writer `concurrently` controller -type RemoteMap = M.Map Git.Repo (IO (), Chan Consumed) +type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed) -- Runs the transports, dispatching messages to them, and handling -- the main control messages. -runController :: Chan Consumed -> Chan Emitted -> IO () +runController :: TChan Consumed -> TChan Emitted -> IO () runController ichan ochan = do h <- genTransportHandle m <- genRemoteMap h ochan @@ -58,7 +59,7 @@ runController ichan ochan = do go h False m where go h paused m = do - cmd <- readChan ichan + cmd <- atomically $ readTChan ichan case cmd of RELOAD -> do h' <- updateTransportHandle h @@ -88,22 +89,28 @@ runController ichan ochan = do -- All remaining messages are sent to -- all Transports. msg -> do - unless paused $ - forM_ chans (`writeChan` msg) + unless paused $ atomically $ + forM_ chans (`writeTChan` msg) go h paused m where chans = map snd (M.elems m) startrunning m = forM_ (M.elems m) startrunning' - startrunning' (transport, _) = void $ async transport + startrunning' (transport, c) = do + -- drain any old control messages from the channel + -- to avoid confusing the transport with them + atomically $ drain c + void $ async transport + + drain c = maybe noop (const $ drain c) =<< tryReadTChan c - broadcast msg m = forM_ (M.elems m) send + broadcast msg m = atomically $ forM_ (M.elems m) send where - send (_, c) = writeChan c msg + send (_, c) = writeTChan c msg -- Generates a map with a transport for each supported remote in the git repo, -- except those that have annex.sync = false -genRemoteMap :: TransportHandle -> Chan Emitted -> IO RemoteMap +genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap genRemoteMap h@(TransportHandle g _) ochan = M.fromList . catMaybes <$> mapM gen (Git.remotes g) where @@ -111,7 +118,7 @@ genRemoteMap h@(TransportHandle g _) ochan = Git.Url u -> case M.lookup (uriScheme u) remoteTransports of Just transport | remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do - ichan <- newChan :: IO (Chan Consumed) + ichan <- newTChanIO :: IO (TChan Consumed) return $ Just ( r , (transport r (RemoteURI u) h ichan ochan, ichan) |