blob: a9925c9e555896473ba822e9f62bcdd8f96c6fc2 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
{- git-annex assistant transfer watching thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.TransferWatcher where
import Assistant.Common
import Assistant.DaemonStatus
import Assistant.TransferQueue
import Assistant.Drop
import Annex.Content
import Logs.Transfer
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Remote
import Control.Concurrent
{- This thread watches for changes to the gitAnnexTransferDir,
- and updates the DaemonStatus's map of ongoing transfers. -}
transferWatcherThread :: NamedThread
transferWatcherThread = NamedThread "TransferWatcher" $ do
dir <- liftAnnex $ gitAnnexTransferDir <$> gitRepo
liftIO $ createDirectoryIfMissing True dir
let hook a = Just <$> asIO2 (runHandler a)
addhook <- hook onAdd
delhook <- hook onDel
modifyhook <- hook onModify
errhook <- hook onErr
let hooks = mkWatchHooks
{ addHook = addhook
, delHook = delhook
, modifyHook = modifyhook
, errHook = errhook
}
void $ liftIO $ watchDir dir (const False) hooks id
debug ["watching for transfers"]
type Handler = FilePath -> Assistant ()
{- Runs an action handler.
-
- Exceptions are ignored, otherwise a whole thread could be crashed.
-}
runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant ()
runHandler handler file _filestatus =
either (liftIO . print) (const noop) =<< tryIO <~> handler file
{- Called when there's an error with inotify. -}
onErr :: Handler
onErr msg = error msg
{- Called when a new transfer information file is written. -}
onAdd :: Handler
onAdd file = case parseTransferFile file of
Nothing -> noop
Just t -> go t =<< liftAnnex (checkTransfer t)
where
go _ Nothing = noop -- transfer already finished
go t (Just info) = do
debug [ "transfer starting:", show t]
r <- headMaybe . filter (sameuuid t)
<$> liftAnnex Remote.remoteList
dstatus <- getAssistant daemonStatusHandle
liftIO $ updateTransferInfo dstatus t info { transferRemote = r }
sameuuid t r = Remote.uuid r == transferUUID t
{- Called when a transfer information file is updated.
-
- The only thing that should change in the transfer info is the
- bytesComplete, so that's the only thing updated in the DaemonStatus. -}
onModify :: Handler
onModify file = do
case parseTransferFile file of
Nothing -> noop
Just t -> go t =<< liftIO (readTransferInfoFile Nothing file)
where
go _ Nothing = noop
go t (Just newinfo) = withAssistant daemonStatusHandle $ \h ->
alterTransferInfo h t $
\i -> i { bytesComplete = bytesComplete newinfo }
{- This thread can only watch transfer sizes when the DirWatcher supports
- tracking modificatons to files. -}
watchesTransferSize :: Bool
watchesTransferSize = modifyTracked
{- Called when a transfer information file is removed. -}
onDel :: Handler
onDel file = case parseTransferFile file of
Nothing -> noop
Just t -> do
debug [ "transfer finishing:", show t]
minfo <- flip removeTransfer t <<~ daemonStatusHandle
finished <- asIO2 finishedTransfer
void $ liftIO $ forkIO $ do
{- XXX race workaround delay. The location
- log needs to be updated before finishedTransfer
- runs. -}
threadDelay 10000000 -- 10 seconds
finished t minfo
{- Queue uploads of files we successfully downloaded, spreading them
- out to other reachable remotes.
-
- Downloading a file may have caused a remote to not want it;
- so drop it from the remote.
-
- Uploading a file may cause the local repo, or some other remote to not
- want it; handle that too.
-}
finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
finishedTransfer t (Just info)
| transferDirection t == Download =
whenM (liftAnnex $ inAnnex $ transferKey t) $ do
dstatus <- getAssistant daemonStatusHandle
transferqueue <- getAssistant transferQueue
liftAnnex $ handleDrops dstatus False
(transferKey t) (associatedFile info)
liftAnnex $ queueTransfersMatching (/= transferUUID t)
Later transferqueue dstatus
(transferKey t) (associatedFile info) Upload
| otherwise = do
dstatus <- getAssistant daemonStatusHandle
liftAnnex $ handleDrops dstatus True (transferKey t) (associatedFile info)
finishedTransfer _ _ = noop
|