summaryrefslogtreecommitdiff
path: root/RemoteDaemon/Core.hs
blob: 399b1553af2231c526c27297d5560a4efb4794b3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
{- git-remote-daemon core
 -
 - Copyright 2014-2016 Joey Hess <id@joeyh.name>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

module RemoteDaemon.Core (runInteractive, runNonInteractive) where

import qualified Annex
import Common
import Types.GitConfig
import Config.DynamicConfig
import RemoteDaemon.Common
import RemoteDaemon.Types
import RemoteDaemon.Transport
import qualified Git
import qualified Git.Types as Git
import qualified Git.CurrentRepo
import Utility.SimpleProtocol
import Utility.ThreadScheduler
import Config
import Annex.Ssh
import Types.Messages

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Network.URI
import qualified Data.Map as M

runInteractive :: IO ()
runInteractive = do
	(readh, writeh) <- dupIoHandles
	ichan <- newTChanIO :: IO (TChan Consumed)
	ochan <- newTChanIO :: IO (TChan Emitted)

	let reader = forever $ do
		l <- hGetLine readh
		case parseMessage l of
			Nothing -> error $ "protocol error: " ++ l
			Just cmd -> atomically $ writeTChan ichan cmd
	let writer = forever $ do
		msg <- atomically $ readTChan ochan
		hPutStrLn writeh $ unwords $ formatMessage msg
		hFlush writeh
	let controller = runController ichan ochan
	
	-- If any thread fails, the rest will be killed.
	void $ tryIO $ reader
		`concurrently` writer
		`concurrently` controller

runNonInteractive :: IO ()
runNonInteractive = do
	ichan <- newTChanIO :: IO (TChan Consumed)
	ochan <- newTChanIO :: IO (TChan Emitted)
	
	let reader = forever $ do
		threadDelaySeconds (Seconds (60*60))
		atomically $ writeTChan ichan RELOAD
	let writer = forever $
		void $ atomically $ readTChan ochan
	let controller = runController ichan ochan
	
	void $ tryIO $ reader
		`concurrently` writer
		`concurrently` controller

type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)

-- Runs the transports, dispatching messages to them, and handling
-- the main control messages.
runController :: TChan Consumed -> TChan Emitted -> IO ()
runController ichan ochan = do
	h <- genTransportHandle
	m <- genRemoteMap h ochan
	starttransports m
	serverchans <- mapM (startserver h) remoteServers
	go h False m serverchans
  where
	go h paused m serverchans = do
		cmd <- atomically $ readTChan ichan
		broadcast cmd serverchans
		case cmd of
			RELOAD -> do
				h' <- updateTransportHandle h
				m' <- genRemoteMap h' ochan
				let common = M.intersection m m'
				let new = M.difference m' m
				let old = M.difference m m'
				broadcast STOP (mchans old)
				unless paused $
					starttransports new
				go h' paused (M.union common new) serverchans
			LOSTNET -> do
				-- force close all cached ssh connections
				-- (done here so that if there are multiple
				-- ssh remotes, it's only done once)
				liftAnnex h forceSshCleanup
				broadcast LOSTNET transportchans
				go h True m serverchans
			PAUSE -> do
				broadcast STOP transportchans
				go h True m serverchans
			RESUME -> do
				when paused $
					starttransports m
				go h False m serverchans
			STOP -> exitSuccess
			-- All remaining messages are sent to
			-- all Transports.
			msg -> do
				unless paused $
					broadcast msg transportchans
				go h paused m serverchans
	  where
		transportchans = mchans m
		mchans = map snd . M.elems
	
	startserver h server = do
		c <- newTChanIO
		void $ async $ server c h
		return c

	starttransports m = forM_ (M.elems m) starttransports'
	starttransports' (transport, c) = do
		-- drain any old control messages from the channel
		-- to avoid confusing the transport with them
		atomically $ drain c
		void $ async transport
	
	drain c = maybe noop (const $ drain c) =<< tryReadTChan c
	
	broadcast msg cs = atomically $ forM_ cs $ \c -> writeTChan c msg

-- Generates a map with a transport for each supported remote in the git repo,
-- except those that have annex.sync = false
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = 
	M.fromList . catMaybes <$> mapM gen (Git.remotes g)
  where
	gen r = do
		gc <- atomically $ extractRemoteGitConfig g (Git.repoDescribe r)
		case Git.location r of
			Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
				Just transport -> ifM (getDynamicConfig (remoteAnnexSync gc))
					( do
						ichan <- newTChanIO :: IO (TChan Consumed)
						return $ Just
							( r
							, (transport (RemoteRepo r gc) (RemoteURI u) h ichan ochan, ichan)
							)
					, return Nothing
					)
				Nothing -> return Nothing
			_ -> return Nothing

genTransportHandle :: IO TransportHandle
genTransportHandle = do
	annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
	g <- Annex.repo <$> readMVar annexstate
	let h = TransportHandle (LocalRepo g) annexstate
	liftAnnex h $ Annex.setOutput QuietOutput
	return h

updateTransportHandle :: TransportHandle -> IO TransportHandle
updateTransportHandle h@(TransportHandle _g annexstate) = do
	g' <- liftAnnex h $ do
		reloadConfig
		Annex.fromRepo id
	return (TransportHandle (LocalRepo g') annexstate)