summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/DaemonStatus.hs9
-rw-r--r--Assistant/Threads/TransferWatcher.hs20
-rw-r--r--Assistant/Threads/Transferrer.hs102
-rw-r--r--Assistant/TransferQueue.hs8
4 files changed, 124 insertions, 15 deletions
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index a3e909904..40816bb1a 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -31,12 +31,14 @@ data DaemonStatus = DaemonStatus
-- Last time the sanity checker ran
, lastSanityCheck :: Maybe POSIXTime
-- Currently running file content transfers
- , currentTransfers :: M.Map Transfer TransferInfo
+ , currentTransfers :: TransferMap
-- Ordered list of remotes to talk to.
, knownRemotes :: [Remote]
}
deriving (Show)
+type TransferMap = M.Map Transfer TransferInfo
+
type DaemonStatusHandle = MVar DaemonStatus
newDaemonStatus :: DaemonStatus
@@ -132,3 +134,8 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int
tenMinutes = 10 * 60
+
+{- Mutates the transfer map. -}
+adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex ()
+adjustTransfers dstatus a = modifyDaemonStatus dstatus $
+ \s -> s { currentTransfers = a (currentTransfers s) }
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
index 811b045a8..f18d4e3f8 100644
--- a/Assistant/Threads/TransferWatcher.hs
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -58,21 +58,17 @@ onAdd :: Handler
onAdd st dstatus file _ = case parseTransferFile file of
Nothing -> noop
Just t -> do
- minfo <- runThreadState st $ checkTransfer t
pid <- getProcessID
- case minfo of
- Nothing -> noop -- transfer already finished
- Just info
- | transferPid info == Just pid -> noop
- | otherwise -> adjustTransfers st dstatus
- (M.insertWith' const t info)
+ runThreadState st $ go t pid =<< checkTransfer t
+ where
+ go _ _ Nothing = noop -- transfer already finished
+ go t pid (Just info)
+ | transferPid info == Just pid = noop
+ | otherwise = adjustTransfers dstatus $
+ M.insertWith' const t info
{- Called when a transfer information file is removed. -}
onDel :: Handler
onDel st dstatus file _ = case parseTransferFile file of
Nothing -> noop
- Just t -> adjustTransfers st dstatus (M.delete t)
-
-adjustTransfers :: ThreadState -> DaemonStatusHandle -> (M.Map Transfer TransferInfo -> M.Map Transfer TransferInfo) -> IO ()
-adjustTransfers st dstatus a = runThreadState st $ modifyDaemonStatus dstatus $
- \s -> s { currentTransfers = a (currentTransfers s) }
+ Just t -> runThreadState st $ adjustTransfers dstatus $ M.delete t
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
new file mode 100644
index 000000000..0562a607c
--- /dev/null
+++ b/Assistant/Threads/Transferrer.hs
@@ -0,0 +1,102 @@
+{- git-annex assistant data transferrer thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Transferrer where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
+import Logs.Transfer
+import Annex.Content
+import Annex.BranchState
+import Command
+import qualified Command.Move
+
+import Control.Exception as E
+import Control.Concurrent
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- Dispatches transfers from the queue.
+ -
+ - This is currently very simplistic, and runs only one transfer at a time.
+ -}
+transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO ()
+transfererThread st dstatus transferqueue = do
+ mypid <- getProcessID
+ mytid <- myThreadId
+ go mypid mytid
+ where
+ go mypid mytid = do
+ (t, info) <- getNextTransfer transferqueue
+
+ now <- getCurrentTime
+ let info' = info
+ { startedTime = Just now
+ , transferPid = Just mypid
+ , transferThread = Just mytid
+ }
+
+ ifM (runThreadState st $ shouldtransfer t info')
+ ( runTransfer st t info'
+ , noop
+ )
+ go mypid mytid
+
+ -- Check if the transfer is already running,
+ -- and if not, add it to the TransferMap.
+ shouldtransfer t info = do
+ current <- currentTransfers <$> getDaemonStatus dstatus
+ if M.member t current
+ then ifM (validtransfer t)
+ ( do
+ adjustTransfers dstatus $
+ M.insertWith' const t info
+ return True
+ , return False
+ )
+ else return False
+
+ validtransfer t
+ | transferDirection t == Download =
+ not <$> inAnnex (transferKey t)
+ | otherwise = return True
+
+{- A transfer is run in a separate thread, with a *copy* of the Annex
+ - state. This is necessary to avoid blocking the rest of the assistant
+ - on the transfer completing, and also to allow multiple transfers to run
+ - at once.
+ -
+ - However, it means that the transfer threads are responsible
+ - for doing any necessary shutdown cleanups, and that the parent
+ - thread's cache must be invalidated, as changes may have been made to the
+ - git-annex branch.
+ -
+ - Currently a minimal shutdown is done; the transfer threads are
+ - effectively running in oneshot mode, without committing changes to the
+ - git-annex branch, and transfers should never queue git commands to run.
+ -
+ - Note: It is unsafe to call getDaemonStatus inside the transfer thread.
+ -}
+runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO ()
+runTransfer st t info
+ | transferDirection t == Download = go Command.Move.fromStart
+ | otherwise = go Command.Move.toStart
+ where
+ go cmd = case (transferRemote info, associatedFile info) of
+ (Nothing, _) -> noop
+ (_, Nothing) -> noop
+ (Just remote, Just file) ->
+ inthread $ void $ doCommand $
+ cmd remote False file (transferKey t)
+ inthread a = do
+ mvar <- newEmptyMVar
+ void $ forkIO $
+ runThreadState st a `E.finally` putMVar mvar ()
+ void $ takeMVar mvar -- wait for transfer thread
+ runThreadState st invalidateCache
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index f1f4882be..a35815ca1 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -25,6 +25,7 @@ stubInfo f = TransferInfo
{ startedTime = Nothing
, transferPid = Nothing
, transferThread = Nothing
+ , transferRemote = Nothing
, bytesComplete = Nothing
, associatedFile = f
}
@@ -33,7 +34,7 @@ stubInfo f = TransferInfo
- remotes. -}
queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
queueTransfers q daemonstatus k f direction =
- mapM_ (liftIO . queueTransfer q f . gentransfer)
+ mapM_ (\r -> queue r $ gentransfer r)
=<< sufficientremotes . knownRemotes
<$> getDaemonStatus daemonstatus
where
@@ -53,8 +54,11 @@ queueTransfers q daemonstatus k f direction =
gentransfer r = Transfer
{ transferDirection = direction
, transferKey = k
- , transferRemote = Remote.uuid r
+ , transferUUID = Remote.uuid r
}
+ queue r t = liftIO $ void $ atomically $ do
+ let info = (stubInfo f) { transferRemote = Just r }
+ writeTChan q (t, info)
{- Adds a pending transfer to the end of the queue. -}
queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()