summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-07-05 18:57:06 -0600
committerGravatar Joey Hess <joey@kitenet.net>2012-07-05 18:57:06 -0600
commita92f5589fcf5549832914fdee34596818bfdc583 (patch)
tree45712848f8a7bddf19c90fc082ee657c40243a38
parent0c563c39dfcd515b115aa37c03551dceffb882c0 (diff)
unfinished (and unbuildable) work toward separate transfer processes
-rw-r--r--Assistant.hs24
-rw-r--r--Assistant/Threads/Transferrer.hs103
-rw-r--r--Logs/Transfer.hs5
3 files changed, 63 insertions, 69 deletions
diff --git a/Assistant.hs b/Assistant.hs
index e751b4ae8..38ed539a1 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -31,14 +31,15 @@
- them.
- Thread 8: merger
- Waits for pushes to be received from remotes, and merges the
- - updated branches into the current branch. This uses inotify
- - on .git/refs/heads, so there are additional inotify threads
- - associated with it, too.
+ - updated branches into the current branch.
+ - (This uses inotify on .git/refs/heads, so there are additional
+ - inotify threads associated with it, too.)
- Thread 9: transfer watcher
- Watches for transfer information files being created and removed,
- - and maintains the DaemonStatus currentTransfers map. This uses
- - inotify on .git/annex/transfer/, so there are additional inotify
- - threads associated with it, too.
+ - and maintains the DaemonStatus currentTransfers map and the
+ - TransferSlots QSemN.
+ - (This uses inotify on .git/annex/transfer/, so there are
+ - additional inotify threads associated with it, too.)
- Thread 10: transferrer
- Waits for Transfers to be queued and does them.
- Thread 11: status logger
@@ -66,6 +67,12 @@
- retrier blocks until they're available.
- TransferQueue (STM TChan)
- Transfers to make are indicated by writing to this channel.
+ - TransferSlots (QSemN)
+ - Count of the number of currently available transfer slots.
+ - Updated by the transfer watcher, this allows other threads
+ - to block until a slot is available.
+ - This MVar should only be manipulated from inside the Annex monad,
+ - which ensures it's accessed only after the ThreadState MVar.
-}
module Assistant where
@@ -109,15 +116,16 @@ startDaemon assistant foreground
commitchan <- newCommitChan
pushmap <- newFailedPushMap
transferqueue <- newTransferQueue
+ transferslots <- newTransferSlots
mapM_ (void . forkIO)
[ commitThread st changechan commitchan transferqueue dstatus
, pushThread st dstatus commitchan pushmap
, pushRetryThread st pushmap
, mergeThread st
- , transferWatcherThread st dstatus
+ , transferWatcherThread st dstatus transferslots
+ , transfererThread st dstatus transferqueue transferslots
, daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan
- , transfererThread st dstatus transferqueue
, watchThread st dstatus transferqueue changechan
]
waitForTermination
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 0b47e9781..249e15cf2 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -14,6 +14,7 @@ import Assistant.TransferQueue
import Logs.Transfer
import Annex.Content
import Annex.BranchState
+import Utility.ThreadScheduler
import Command
import qualified Command.Move
@@ -22,68 +23,58 @@ import Control.Concurrent
import Data.Time.Clock
import qualified Data.Map as M
-{- Dispatches transfers from the queue.
- -
- - This is currently very simplistic, and runs only one transfer at a time.
- -}
+{- For now only one transfer is run at a time. -}
+maxTransfers :: Int
+maxTransfers = 1
+
+{- Dispatches transfers from the queue. -}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO ()
-transfererThread st dstatus transferqueue = do
- mypid <- getProcessID
- mytid <- myThreadId
- go mypid mytid
+transfererThread st dstatus transferqueue = runEvery (Seconds 1) $ do
+ (t, info) <- getNextTransfer transferqueue
+ go =<< runThreadState st $ shouldTransfer t
where
- go mypid mytid = do
- (t, info) <- getNextTransfer transferqueue
-
- now <- getCurrentTime
- let info' = info
- { startedTime = Just now
- , transferPid = Just mypid
- , transferThread = Just mytid
- }
-
- ifM (runThreadState st $ shouldtransfer t info')
- ( runTransfer st t info'
- , noop
- )
- go mypid mytid
+ go Yes = runTransfer st t
+ go No = noop
+ go TooMany = waitTransfer >> go Yes
- -- Check if the transfer is already running,
- -- and if not, add it to the TransferMap.
- shouldtransfer t info = do
- current <- currentTransfers <$> getDaemonStatus dstatus
- if M.member t current
- then return False
- else ifM (validtransfer t)
- ( do
- adjustTransfers dstatus $
- M.insertWith' const t info
- return True
- , return False
- )
+data ShouldTransfer = Yes | Skip | TooMany
- validtransfer t
+{- Checks if the requested transfer is already running, or
+ - the file to download is already present.
+ -
+ - There also may be too many transfers already running to service this
+ - transfer yet. -}
+shouldTransfer :: DaemonStatusHandle -> Transfer -> Annex ShouldTransfer
+shouldTransfer dstatus t = go =<< currentTransfers <$> getDaemonStatus dstatus
+ where
+ go m
+ | M.member t m = return Skip
+ | M.size m > maxTransfers = return TooMany
| transferDirection t == Download =
- not <$> inAnnex (transferKey t)
- | otherwise = return True
+ ifM (inAnnex $ transferKey t) (No, Yes)
+ | otherwise = return Yes
-{- A transfer is run in a separate thread, with a *copy* of the Annex
+{- Waits for any of the transfers in the map to complete. -}
+waitTransfer :: IO ()
+waitTransfer = error "TODO"
+-- getProcessStatus True False pid
+-- runThreadState st invalidateCache
+
+{- A transfer is run in a separate process, with a *copy* of the Annex
- state. This is necessary to avoid blocking the rest of the assistant
- on the transfer completing, and also to allow multiple transfers to run
- at once.
-
- - However, it means that the transfer threads are responsible
+ - However, it means that the transfer processes are responsible
- for doing any necessary shutdown cleanups, and that the parent
- - thread's cache must be invalidated, as changes may have been made to the
- - git-annex branch.
+ - thread's cache must be invalidated once a transfer completes, as
+ - changes may have been made to the git-annex branch.
-
- - Currently a minimal shutdown is done; the transfer threads are
+ - Currently a minimal shutdown is done; the transfer processes are
- effectively running in oneshot mode, without committing changes to the
- git-annex branch, and transfers should never queue git commands to run.
- -
- - Note: It is unsafe to call getDaemonStatus inside the transfer thread.
-}
-runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO ()
+runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO ProcessID
runTransfer st t info
| transferDirection t == Download = go Command.Move.fromStart
| otherwise = go Command.Move.toStart
@@ -91,12 +82,12 @@ runTransfer st t info
go cmd = case (transferRemote info, associatedFile info) of
(Nothing, _) -> noop
(_, Nothing) -> noop
- (Just remote, Just file) ->
- inthread $ void $ doCommand $
- cmd remote False file (transferKey t)
- inthread a = do
- mvar <- newEmptyMVar
- void $ forkIO $
- unsafeRunThreadState st a `E.finally` putMVar mvar ()
- void $ takeMVar mvar -- wait for transfer thread
- runThreadState st invalidateCache
+ (Just remote, Just file) -> do
+ now <- getCurrentTime
+ pid <- forkProcess $ unsafeRunThreadState st $
+ doCommand $ cmd remote False file (transferKey t)
+ adjustTransfers dstatus $
+ M.insertWith' const t info
+ { startedTime = Just now
+ , transferPid = Just pid
+ }
diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs
index 12ab8ff11..54f98da5c 100644
--- a/Logs/Transfer.hs
+++ b/Logs/Transfer.hs
@@ -14,7 +14,6 @@ import qualified Git
import Types.Remote
import qualified Fields
-import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock
@@ -36,7 +35,6 @@ data Transfer = Transfer
data TransferInfo = TransferInfo
{ startedTime :: Maybe UTCTime
, transferPid :: Maybe ProcessID
- , transferThread :: Maybe ThreadId
, transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer
, associatedFile :: Maybe FilePath
@@ -79,7 +77,6 @@ transfer t file a = do
info <- liftIO $ TransferInfo
<$> (Just <$> getCurrentTime)
<*> pure Nothing -- pid not stored in file, so omitted for speed
- <*> pure Nothing -- threadid not stored in file, so omitted for speed
<*> pure Nothing -- not 0; transfer may be resuming
<*> pure Nothing
<*> pure file
@@ -158,7 +155,6 @@ writeTransferInfo :: TransferInfo -> String
writeTransferInfo info = unlines
-- transferPid is not included; instead obtained by looking at
-- the process that locks the file.
- -- transferThread is not included; not relevant for other processes
[ show $ startedTime info
-- bytesComplete is not included; changes too fast
, fromMaybe "" $ associatedFile info -- comes last; arbitrary content
@@ -172,7 +168,6 @@ readTransferInfo pid s =
<*> pure (Just pid)
<*> pure Nothing
<*> pure Nothing
- <*> pure Nothing
<*> pure (if null filename then Nothing else Just filename)
_ -> Nothing
where