aboutsummaryrefslogtreecommitdiff
path: root/RemoteDaemon/Core.hs
blob: 60a4d5ceb7a4ac71fd4b04a6727c158b13bb4455 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
{- git-remote-daemon core
 -
 - Copyright 2014 Joey Hess <joey@kitenet.net>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

module RemoteDaemon.Core (runForeground) where

import qualified Annex
import Common
import Types.GitConfig
import RemoteDaemon.Common
import RemoteDaemon.Types
import RemoteDaemon.Transport
import qualified Git
import qualified Git.Types as Git
import qualified Git.CurrentRepo
import Utility.SimpleProtocol
import Config
import Annex.Ssh

import Control.Concurrent.Async
import Control.Concurrent
import Network.URI
import qualified Data.Map as M

runForeground :: IO ()
runForeground = do
	(readh, writeh) <- ioHandles
	ichan <- newChan :: IO (Chan Consumed)
	ochan <- newChan :: IO (Chan Emitted)

	let reader = forever $ do
		l <- hGetLine readh
		case parseMessage l of
			Nothing -> error $ "protocol error: " ++ l
			Just cmd -> writeChan ichan cmd
	let writer = forever $ do
		msg <- readChan ochan
		hPutStrLn writeh $ unwords $ formatMessage msg
		hFlush writeh
	let controller = runController ichan ochan
	
	-- If any thread fails, the rest will be killed.
	void $ tryIO $
		reader `concurrently` writer `concurrently` controller

type RemoteMap = M.Map Git.Repo (IO (), Chan Consumed)

-- Runs the transports, dispatching messages to them, and handling
-- the main control messages.
runController :: Chan Consumed -> Chan Emitted -> IO ()
runController ichan ochan = do
	h <- genTransportHandle
	m <- genRemoteMap h ochan
	startrunning m
	go h False m
  where
	go h paused m = do
		cmd <- readChan ichan
		case cmd of
			RELOAD -> do
				h' <- updateTransportHandle h
				m' <- genRemoteMap h' ochan
				let common = M.intersection m m'
				let new = M.difference m' m
				let old = M.difference m m'
				broadcast STOP old
				unless paused $
					startrunning new
				go h' paused (M.union common new)
			LOSTNET -> do
				-- force close all cached ssh connections
				-- (done here so that if there are multiple
				-- ssh remotes, it's only done once)
				liftAnnex h forceSshCleanup
				broadcast LOSTNET m
				go h True m
			PAUSE -> do
				broadcast STOP m
				go h True m
			RESUME -> do
				when paused $
					startrunning m
				go h False m
			STOP -> exitSuccess
			-- All remaining messages are sent to
			-- all Transports.
			msg -> do
				unless paused $
					forM_ chans (`writeChan` msg)
				go h paused m
	  where
		chans = map snd (M.elems m)

	startrunning m = forM_ (M.elems m) startrunning'
	startrunning' (transport, _) = void $ async transport
	
	broadcast msg m = forM_ (M.elems m) send
	  where
		send (_, c) = writeChan c msg

-- Generates a map with a transport for each supported remote in the git repo,
-- except those that have annex.sync = false
genRemoteMap :: TransportHandle -> Chan Emitted -> IO RemoteMap
genRemoteMap h@(TransportHandle g _) ochan = 
	M.fromList . catMaybes <$> mapM gen (Git.remotes g)
  where
	gen r = case Git.location r of
		Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
			Just transport
				| remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do
					ichan <- newChan :: IO (Chan Consumed)
					return $ Just
						( r
						, (transport r (RemoteURI u) h ichan ochan, ichan)
						)
			_ -> return Nothing
		_ -> return Nothing

genTransportHandle :: IO TransportHandle
genTransportHandle = do
	annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
	g <- Annex.repo <$> readMVar annexstate
	return $ TransportHandle g annexstate

updateTransportHandle :: TransportHandle -> IO TransportHandle
updateTransportHandle h@(TransportHandle _g annexstate) = do
	g' <- liftAnnex h $ do
		reloadConfig
		Annex.fromRepo id
	return (TransportHandle g' annexstate)