From d80aa496961438584c6eb9e5330eb6beba46ec04 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 15 Jan 2015 15:37:48 -0400 Subject: remotedaemon: Fix problem that could prevent ssh connections being made after two LOSTNET messages were received in a row Perhaps due to two different network interfaces being brought down. Since there is no reliable way to drain a Chan, I switched to STM TChan. --- RemoteDaemon/Core.hs | 37 ++++++++++++++++++++++--------------- RemoteDaemon/Transport/Ssh.hs | 6 +++--- RemoteDaemon/Types.hs | 3 ++- 3 files changed, 27 insertions(+), 19 deletions(-) (limited to 'RemoteDaemon') diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 60a4d5ceb..ed79c0195 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -20,24 +20,25 @@ import Utility.SimpleProtocol import Config import Annex.Ssh -import Control.Concurrent.Async import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM import Network.URI import qualified Data.Map as M runForeground :: IO () runForeground = do (readh, writeh) <- ioHandles - ichan <- newChan :: IO (Chan Consumed) - ochan <- newChan :: IO (Chan Emitted) + ichan <- newTChanIO :: IO (TChan Consumed) + ochan <- newTChanIO :: IO (TChan Emitted) let reader = forever $ do l <- hGetLine readh case parseMessage l of Nothing -> error $ "protocol error: " ++ l - Just cmd -> writeChan ichan cmd + Just cmd -> atomically $ writeTChan ichan cmd let writer = forever $ do - msg <- readChan ochan + msg <- atomically $ readTChan ochan hPutStrLn writeh $ unwords $ formatMessage msg hFlush writeh let controller = runController ichan ochan @@ -46,11 +47,11 @@ runForeground = do void $ tryIO $ reader `concurrently` writer `concurrently` controller -type RemoteMap = M.Map Git.Repo (IO (), Chan Consumed) +type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed) -- Runs the transports, dispatching messages to them, and handling -- the main control messages. -runController :: Chan Consumed -> Chan Emitted -> IO () +runController :: TChan Consumed -> TChan Emitted -> IO () runController ichan ochan = do h <- genTransportHandle m <- genRemoteMap h ochan @@ -58,7 +59,7 @@ runController ichan ochan = do go h False m where go h paused m = do - cmd <- readChan ichan + cmd <- atomically $ readTChan ichan case cmd of RELOAD -> do h' <- updateTransportHandle h @@ -88,22 +89,28 @@ runController ichan ochan = do -- All remaining messages are sent to -- all Transports. msg -> do - unless paused $ - forM_ chans (`writeChan` msg) + unless paused $ atomically $ + forM_ chans (`writeTChan` msg) go h paused m where chans = map snd (M.elems m) startrunning m = forM_ (M.elems m) startrunning' - startrunning' (transport, _) = void $ async transport + startrunning' (transport, c) = do + -- drain any old control messages from the channel + -- to avoid confusing the transport with them + atomically $ drain c + void $ async transport + + drain c = maybe noop (const $ drain c) =<< tryReadTChan c - broadcast msg m = forM_ (M.elems m) send + broadcast msg m = atomically $ forM_ (M.elems m) send where - send (_, c) = writeChan c msg + send (_, c) = writeTChan c msg -- Generates a map with a transport for each supported remote in the git repo, -- except those that have annex.sync = false -genRemoteMap :: TransportHandle -> Chan Emitted -> IO RemoteMap +genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap genRemoteMap h@(TransportHandle g _) ochan = M.fromList . catMaybes <$> mapM gen (Git.remotes g) where @@ -111,7 +118,7 @@ genRemoteMap h@(TransportHandle g _) ochan = Git.Url u -> case M.lookup (uriScheme u) remoteTransports of Just transport | remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do - ichan <- newChan :: IO (Chan Consumed) + ichan <- newTChanIO :: IO (TChan Consumed) return $ Just ( r , (transport r (RemoteURI u) h ichan ochan, ichan) diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index afedf559e..6315ede85 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -18,7 +18,7 @@ import qualified Git import Git.Command import Utility.ThreadScheduler -import Control.Concurrent.Chan +import Control.Concurrent.STM import Control.Concurrent.Async transport :: Transport @@ -58,7 +58,7 @@ transport' r url transporthandle ichan ochan = do return $ either (either id id) id status - send msg = writeChan ochan msg + send msg = atomically $ writeTChan ochan msg fetch = do send (SYNCING url) @@ -80,7 +80,7 @@ transport' r url transporthandle ichan ochan = do Nothing -> return Stopping handlecontrol = do - msg <- readChan ichan + msg <- atomically $ readTChan ichan case msg of STOP -> return Stopping LOSTNET -> return Stopping diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index 7413f5851..bdc94d949 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -17,6 +17,7 @@ import qualified Utility.SimpleProtocol as Proto import Network.URI import Control.Concurrent +import Control.Concurrent.STM -- The URI of a remote is used to uniquely identify it (names change..) newtype RemoteURI = RemoteURI URI @@ -24,7 +25,7 @@ newtype RemoteURI = RemoteURI URI -- A Transport for a particular git remote consumes some messages -- from a Chan, and emits others to another Chan. -type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO () +type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> TChan Consumed -> TChan Emitted -> IO () type RemoteRepo = Git.Repo type LocalRepo = Git.Repo -- cgit v1.2.3