aboutsummaryrefslogtreecommitdiff
path: root/RemoteDaemon/Transport/Ssh.hs
blob: 6315ede85c21d926ac98ea77fd5150e9d832c66c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
{- git-remote-daemon, git-annex-shell over ssh transport
 -
 - Copyright 2014 Joey Hess <joey@kitenet.net>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

module RemoteDaemon.Transport.Ssh (transport) where

import Common.Annex
import Annex.Ssh
import RemoteDaemon.Types
import RemoteDaemon.Common
import Remote.Helper.Ssh
import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
import Utility.SimpleProtocol
import qualified Git
import Git.Command
import Utility.ThreadScheduler

import Control.Concurrent.STM
import Control.Concurrent.Async

transport :: Transport
transport r url h@(TransportHandle g s) ichan ochan = do
	-- enable ssh connection caching wherever inLocalRepo is called
	g' <- liftAnnex h $ sshCachingTo r g
	transport' r url (TransportHandle g' s) ichan ochan

transport' :: Transport
transport' r url transporthandle ichan ochan = do

	v <- liftAnnex transporthandle $ git_annex_shell r "notifychanges" [] []
	case v of
		Nothing -> noop
		Just (cmd, params) -> robustly 1 $
			connect cmd (toCommand params)
  where
	connect cmd params = do
		(Just toh, Just fromh, Just errh, pid) <-
			createProcess (proc cmd params)
			{ std_in = CreatePipe
			, std_out = CreatePipe
			, std_err = CreatePipe
			}
		
		-- Run all threads until one finishes and get the status
		-- of the first to finish. Cancel the rest.
		status <- catchDefaultIO (Right ConnectionClosed) $
			handlestderr errh
				`race` handlestdout fromh
					`race` handlecontrol

		send (DISCONNECTED url)
		hClose toh
		hClose fromh
		void $ waitForProcess pid

		return $ either (either id id) id status

	send msg = atomically $ writeTChan ochan msg

	fetch = do
		send (SYNCING url)
		ok <- inLocalRepo transporthandle $
			runBool [Param "fetch", Param $ Git.repoDescribe r]
		send (DONESYNCING url ok)
		
	handlestdout fromh = do
		l <- hGetLine fromh
		case parseMessage l of
			Just SshRemote.READY -> do
				send (CONNECTED url)
				handlestdout fromh
			Just (SshRemote.CHANGED shas) -> do
				whenM (checkNewShas transporthandle shas) $
					fetch
				handlestdout fromh
			-- avoid reconnect on protocol error
			Nothing -> return Stopping
	
	handlecontrol = do
		msg <- atomically $ readTChan ichan
		case msg of
			STOP -> return Stopping
			LOSTNET -> return Stopping
			_ -> handlecontrol

	-- Old versions of git-annex-shell that do not support
	-- the notifychanges command will exit with a not very useful
	-- error message. Detect that error, and avoid reconnecting.
	-- Propigate all stderr.
	handlestderr errh = do
		s <- hGetSomeString errh 1024
		hPutStr stderr s
		hFlush stderr
		if "git-annex-shell: git-shell failed" `isInfixOf` s
			then do
				send $ WARNING url $ unwords
					[ "Remote", Git.repoDescribe r
					, "needs its git-annex upgraded"
					, "to 5.20140405 or newer"
					]
				return Stopping
			else handlestderr errh

data Status = Stopping | ConnectionClosed

{- Make connection robustly, with exponentioal backoff on failure. -}
robustly :: Int -> IO Status -> IO ()
robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a
  where
	caught Stopping = return ()
	caught ConnectionClosed = do
		threadDelaySeconds (Seconds backoff)
		robustly increasedbackoff a

	increasedbackoff
		| b2 > maxbackoff = maxbackoff
		| otherwise = b2
	  where
		b2 = backoff * 2
		maxbackoff = 3600 -- one hour