1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
{- git-annex assistant data transferrer thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.Transferrer where
import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.TransferQueue
import Assistant.TransferSlots
import Logs.Transfer
import Annex.Content
import Utility.ThreadScheduler
import Command
import qualified Command.Move
import Data.Time.Clock
import qualified Data.Map as M
{- For now only one transfer is run at a time. -}
maxTransfers :: Int
maxTransfers = 1
{- Dispatches transfers from the queue. -}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
transfererThread st dstatus transferqueue slots = runEvery (Seconds 1) $ do
(t, info) <- getNextTransfer transferqueue
whenM (runThreadState st $ shouldTransfer dstatus t) $
void $ inTransferSlot slots $
runTransfer st dstatus t info
{- Checks if the requested transfer is already running, or
- the file to download is already present. -}
shouldTransfer :: DaemonStatusHandle -> Transfer -> Annex Bool
shouldTransfer dstatus t = go =<< currentTransfers <$> getDaemonStatus dstatus
where
go m
| M.member t m = return False
| transferDirection t == Download =
not <$> inAnnex (transferKey t)
| otherwise = return True
{- A transfer is run in a separate process, with a *copy* of the Annex
- state. This is necessary to avoid blocking the rest of the assistant
- on the transfer completing, and also to allow multiple transfers to run
- at once.
-
- However, it means that the transfer processes are responsible
- for doing any necessary shutdown cleanups, and that the parent
- thread's cache must be invalidated once a transfer completes, as
- changes may have been made to the git-annex branch.
-
- Currently a minimal shutdown is done; the transfer processes are
- effectively running in oneshot mode, without committing changes to the
- git-annex branch, and transfers should never queue git commands to run.
-}
runTransfer :: ThreadState -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
runTransfer st dstatus t info
| transferDirection t == Download = go Command.Move.fromStart
| otherwise = go Command.Move.toStart
where
go cmd = case (transferRemote info, associatedFile info) of
(Nothing, _) -> noop
(_, Nothing) -> noop
(Just remote, Just file) -> do
now <- getCurrentTime
pid <- forkProcess $ unsafeRunThreadState st $ void $
doCommand $ cmd remote False file (transferKey t)
runThreadState st $
adjustTransfers dstatus $
M.insertWith' const t info
{ startedTime = Just now
, transferPid = Just pid
, shouldWait = True
}
|