1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
{- P2P protocol, Annex implementation
-
- Copyright 2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
module P2P.Annex
( RunMode(..)
, RunEnv(..)
, runFullProto
) where
import Annex.Common
import Annex.Content
import Annex.Transfer
import Annex.Notification
import P2P.Protocol
import P2P.IO
import Logs.Location
import Types.NumCopies
import Control.Monad.Free
import qualified Data.ByteString.Lazy as L
-- When we're serving a peer, we know their uuid, and can use it to update
-- transfer logs.
data RunMode
= Serving UUID
| Client
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunMode -> RunEnv -> Proto a -> Annex (Maybe a)
runFullProto runmode runenv = go
where
go :: RunProto Annex
go (Pure v) = pure (Just v)
go (Free (Net n)) = runNet runenv go n
go (Free (Local l)) = runLocal runmode go l
runLocal :: RunMode -> RunProto Annex -> LocalF (Proto a) -> Annex (Maybe a)
runLocal runmode runner a = case a of
TmpContentSize k next -> do
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
runner (next (Len size))
ContentSize k next -> do
let getsize = liftIO . catchMaybeIO . getFileSize
size <- inAnnex' isJust Nothing getsize k
runner (next (Len <$> size))
-- TODO transfer log not updated
ReadContent k af (Offset o) next -> do
v <- tryNonAsync $ prepSendAnnex k
case v of
-- The check can detect a problem after the
-- content is sent, but we don't use it.
-- Instead, the receiving peer must AlwaysVerify
-- the content it receives.
Right (Just (f, _check)) -> do
v' <- tryNonAsync $ -- transfer upload k af $
liftIO $ do
h <- openBinaryFile f ReadMode
when (o /= 0) $
hSeek h AbsoluteSeek o
L.hGetContents h
case v' of
Left _ -> return Nothing
Right b -> runner (next b)
_ -> return Nothing
WriteContent k af (Offset o) (Len l) b next -> do
ok <- flip catchNonAsync (const $ return False) $
transfer download k af $
getViaTmp AlwaysVerify k $ \tmp -> liftIO $ do
withBinaryFile tmp WriteMode $ \h -> do
when (o /= 0) $
hSeek h AbsoluteSeek o
L.hPut h b
sz <- getFileSize tmp
return (toInteger sz == l, UnVerified)
runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
case v of
Left _ -> return Nothing
Right () -> runner next
CheckContentPresent k next -> do
v <- tryNonAsync $ inAnnex k
case v of
Left _ -> return Nothing
Right result -> runner (next result)
RemoveContent k next -> do
v <- tryNonAsync $ lockContentForRemoval k $ \contentlock -> do
removeAnnex contentlock
logStatus k InfoMissing
return True
case v of
Left _ -> return Nothing
Right result -> runner (next result)
TryLockContent k protoaction next -> do
v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
case verifiedcopy of
LockedCopy _ -> runner (protoaction True)
_ -> runner (protoaction False)
-- If locking fails, lockContentShared throws an exception.
-- Let the peer know it failed.
case v of
Left _ -> runner $ do
protoaction False
next
Right _ -> runner next
where
transfer mk k af a = case runmode of
-- Update transfer logs when serving.
Serving theiruuid ->
mk theiruuid k af noRetry (const a) noNotification
-- Transfer logs are updated higher in the stack when
-- a client.
Client -> a
|