aboutsummaryrefslogtreecommitdiff
path: root/P2P/Annex.hs
blob: 56f94d2bb7b6d8a915a27debee3ddb7e5bb2009c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
{- P2P protocol, Annex implementation
 -
 - Copyright 2016 Joey Hess <id@joeyh.name>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

{-# LANGUAGE RankNTypes, FlexibleContexts #-}

module P2P.Annex
	( RunMode(..)
	, P2PConnection(..)
	, runFullProto
	, unusedPeerRemoteName
	, linkAddress
	) where

import Annex.Common
import Annex.Content
import Annex.Transfer
import Annex.ChangedRefs
import P2P.Protocol
import P2P.IO
import P2P.Address
import P2P.Auth
import Logs.Location
import Types.NumCopies
import Utility.Metered
import qualified Git.Command
import qualified Annex
import Annex.UUID
import Git.Types (RemoteName, remoteName, remotes)
import Config

import Control.Monad.Free

data RunMode
	= Serving UUID (Maybe ChangedRefsHandle)
	| Client

-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunMode -> P2PConnection -> Proto a -> Annex (Either String a)
runFullProto runmode conn = go
  where
	go :: RunProto Annex
	go (Pure v) = return (Right v)
	go (Free (Net n)) = runNet conn go n
	go (Free (Local l)) = runLocal runmode go l

runLocal :: RunMode -> RunProto Annex -> LocalF (Proto a) -> Annex (Either String a)
runLocal runmode runner a = case a of
	TmpContentSize k next -> do
		tmp <- fromRepo $ gitAnnexTmpObjectLocation k
		size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
		runner (next (Len size))
	FileSize f next -> do
		size <- liftIO $ catchDefaultIO 0 $ getFileSize f
		runner (next (Len size))
	ContentSize k next -> do
		let getsize = liftIO . catchMaybeIO . getFileSize
		size <- inAnnex' isJust Nothing getsize k
		runner (next (Len <$> size))
	ReadContent k af o sender next -> do
		v <- tryNonAsync $ prepSendAnnex k
		case v of
			-- The check can detect if the file
			-- changed while it was transferred, but we don't
			-- use it. Instead, the receiving peer must
			-- AlwaysVerify the content it receives.
			Right (Just (f, _check)) -> do
				v' <- tryNonAsync $
					transfer upload k af $
						sinkfile f o sender
				case v' of
					Left e -> return (Left (show e))
					Right (Left e) -> return (Left (show e))
					Right (Right ok) -> runner (next ok)
			-- content not available
 			Right Nothing -> runner (next False)
			Left e -> return (Left (show e))
	StoreContent k af o l getb next -> do
		ok <- flip catchNonAsync (const $ return False) $
			transfer download k af $ \p ->
				getViaTmp AlwaysVerify k $ \tmp ->
					unVerified $ storefile tmp o l getb p
		runner (next ok)
	StoreContentTo dest o l getb next -> do
		ok <- flip catchNonAsync (const $ return False) $
			storefile dest o l getb nullMeterUpdate
		runner (next ok)
	SetPresent k u next -> do
		v <- tryNonAsync $ logChange k u InfoPresent
		case v of
			Left e -> return (Left (show e))
			Right () -> runner next
	CheckContentPresent k next -> do
		v <- tryNonAsync $ inAnnex k
		case v of
			Left e -> return (Left (show e))
			Right result -> runner (next result)
	RemoveContent k next -> do
		v <- tryNonAsync $
			ifM (Annex.Content.inAnnex k)
				( lockContentForRemoval k $ \contentlock -> do
					removeAnnex contentlock
					logStatus k InfoMissing
					return True
				, return True
				)
		case v of
			Left e -> return (Left (show e))
			Right result -> runner (next result)
	TryLockContent k protoaction next -> do
		v <- tryNonAsync $ lockContentShared k $ \verifiedcopy -> 
			case verifiedcopy of
				LockedCopy _ -> runner (protoaction True)
				_ -> runner (protoaction False)
		-- If locking fails, lockContentShared throws an exception.
		-- Let the peer know it failed.
		case v of
			Left _ -> runner $ do
				protoaction False
				next
			Right _ -> runner next
	WaitRefChange next -> case runmode of
		Serving _ (Just h) -> do
			v <- tryNonAsync $ liftIO $ waitChangedRefs h
			case v of
				Left e -> return (Left (show e))
				Right changedrefs -> runner (next changedrefs)
		_ -> return $ Left "change notification not available"
	AddLinkToPeer addr next -> do
		v <- tryNonAsync $ do
			-- Flood protection; don't let a huge number
			-- of peer remotes be created.
			ns <- usedPeerRemoteNames
			if length ns > 100
				then return $ Right False
				else linkAddress addr [] =<< unusedPeerRemoteName
		case v of
			Right (Right r) -> runner (next r)
			_ -> runner (next False)
  where
	transfer mk k af ta = case runmode of
		-- Update transfer logs when serving.
		Serving theiruuid _ -> 
			mk theiruuid k af noRetry ta noNotification
		-- Transfer logs are updated higher in the stack when
		-- a client.
		Client -> ta nullMeterUpdate
	
	storefile dest (Offset o) (Len l) getb p = do
		let p' = offsetMeterUpdate p (toBytesProcessed o)
		v <- runner getb
		case v of
			Right b -> liftIO $ do
				withBinaryFile dest ReadWriteMode $ \h -> do
					when (o /= 0) $
						hSeek h AbsoluteSeek o
					meteredWrite p' h b
				sz <- getFileSize dest
				return (toInteger sz == l + o)
			Left e -> error e
	
	sinkfile f (Offset o) sender p = bracket setup cleanup go
	  where
		setup = liftIO $ openBinaryFile f ReadMode
		cleanup = liftIO . hClose
		go h = do
			let p' = offsetMeterUpdate p (toBytesProcessed o)
			when (o /= 0) $
				liftIO $ hSeek h AbsoluteSeek o
			b <- liftIO $ hGetContentsMetered h p'
			runner (sender b)

unusedPeerRemoteName :: Annex RemoteName
unusedPeerRemoteName = go (1 :: Integer) =<< usedPeerRemoteNames
  where
	go n names = do
		let name = "peer" ++ show n
		if name `elem` names
			then go (n+1) names
			else return name

usedPeerRemoteNames :: Annex [RemoteName]
usedPeerRemoteNames = filter ("peer" `isPrefixOf`) 
	. mapMaybe remoteName . remotes <$> Annex.gitRepo

linkAddress :: P2PAddressAuth -> [P2PAddressAuth] -> RemoteName -> Annex (Either String Bool)
linkAddress (P2PAddressAuth addr authtoken) linkbackto remotename = do
	g <- Annex.gitRepo
	cv <- liftIO $ tryNonAsync $ connectPeer g addr
	case cv of
		Left e -> return $ Left $ "Unable to connect with peer. Please check that the peer is connected to the network, and try again. ("  ++ show e ++ ")"
		Right conn -> do
			u <- getUUID
			v <- liftIO $ runNetProto conn $ do
				authresp <- P2P.Protocol.auth u authtoken
				lbok <- forM linkbackto $ P2P.Protocol.link
				return (authresp, lbok)
			case v of
				Right (Just theiruuid, lbok) -> do
					ok <- inRepo $ Git.Command.runBool
						[ Param "remote", Param "add"
						, Param remotename
						, Param (formatP2PAddress addr)
						]
					when ok $ do
						storeUUIDIn (remoteConfig remotename "uuid") theiruuid
						storeP2PRemoteAuthToken addr authtoken
					if not ok
						then return $ Right False
						else if or lbok || null linkbackto
							then return $ Right True
							else return $ Left "Linked with peer. However, the peer was unable to link back to us, so the link is one-way."
				Right (Nothing, _) -> return $ Left "Unable to authenticate with peer. Please check the address and try again."
				Left e -> return $ Left $ "Unable to authenticate with peer: " ++ e