summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/Threads/Transferrer.hs51
-rw-r--r--Assistant/TransferSlots.hs44
-rw-r--r--Assistant/WebApp/DashBoard.hs7
3 files changed, 52 insertions, 50 deletions
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 9a772d628..d4c00afd8 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -33,21 +33,23 @@ maxTransfers = 1
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
transfererThread st dstatus transferqueue slots = go =<< readProgramFile
where
- go program = getNextTransfer transferqueue dstatus notrunning >>= handle program
- handle program Nothing = go program
- handle program (Just (t, info)) = do
- ifM (runThreadState st $ shouldTransfer t info)
- ( do
- debug thisThread [ "Transferring:" , show t ]
- notifyTransfer dstatus
- transferThread dstatus slots t info inTransferSlot program
- , do
- debug thisThread [ "Skipping unnecessary transfer:" , show t ]
- -- getNextTransfer added t to the
- -- daemonstatus's transfer map.
- void $ removeTransfer dstatus t
- )
- go program
+ go program = forever $ inTransferSlot dstatus slots $
+ getNextTransfer transferqueue dstatus notrunning
+ >>= handle program
+ handle _ Nothing = return Nothing
+ handle program (Just (t, info)) = ifM (runThreadState st $ shouldTransfer t info)
+ ( do
+ debug thisThread [ "Transferring:" , show t ]
+ notifyTransfer dstatus
+ let a = doTransfer dstatus t info program
+ return $ Just (t, info, a)
+ , do
+ debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ -- getNextTransfer added t to the
+ -- daemonstatus's transfer map.
+ void $ removeTransfer dstatus t
+ return Nothing
+ )
{- Skip transfers that are already running. -}
notrunning i = startedTime i == Nothing
@@ -70,24 +72,11 @@ shouldTransfer t info
where
key = transferKey t
-{- A sepeate git-annex process is forked off to run a transfer,
- - running in its own process group. This allows killing it and all its
- - children if the user decides to cancel the transfer.
- -
- - A thread is forked off to run the process, and the thread
- - occupies one of the transfer slots. If all slots are in use, this will
- - block until one becomes available. The thread's id is also recorded in
- - the transfer info; the thread will also be killed when a transfer is
- - stopped, to avoid it displaying any alert about the transfer having
- - failed. -}
-transferThread :: DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> TransferSlotRunner -> FilePath -> IO ()
-transferThread dstatus slots t info runner program = case (transferRemote info, associatedFile info) of
+doTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> FilePath -> IO ()
+doTransfer dstatus t info program = case (transferRemote info, associatedFile info) of
(Nothing, _) -> noop
(_, Nothing) -> noop
- (Just remote, Just file) -> do
- tid <- runner slots $
- transferprocess remote file
- updateTransferInfo dstatus t $ info { transferTid = Just tid }
+ (Just remote, Just file) -> transferprocess remote file
where
direction = transferDirection t
isdownload = direction == Download
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
index 27b869f1d..8e24d730c 100644
--- a/Assistant/TransferSlots.hs
+++ b/Assistant/TransferSlots.hs
@@ -9,13 +9,15 @@
module Assistant.TransferSlots where
+import Common.Annex
+import Utility.ThreadScheduler
+import Assistant.DaemonStatus
+import Logs.Transfer
+
import qualified Control.Exception as E
import Control.Concurrent
import Data.Typeable
-import Common.Annex
-import Utility.ThreadScheduler
-
type TransferSlots = QSemN
{- A special exception that can be thrown to pause or resume a transfer, while
@@ -25,7 +27,8 @@ data TransferException = PauseTransfer | ResumeTransfer
instance E.Exception TransferException
-type TransferSlotRunner = TransferSlots -> IO () -> IO ThreadId
+type TransferSlotRunner = DaemonStatusHandle -> TransferSlots -> TransferGenerator -> IO ()
+type TransferGenerator = IO (Maybe (Transfer, TransferInfo, IO ()))
{- Number of concurrent transfers allowed to be run from the assistant.
-
@@ -38,31 +41,40 @@ numSlots = 1
newTransferSlots :: IO TransferSlots
newTransferSlots = newQSemN numSlots
-{- Waits until a transfer slot becomes available, and runs a transfer
- - action in the slot, in its own thread.
+{- Waits until a transfer slot becomes available, then runs a
+ - TransferGenerator, and then runs the transfer action in its own thread.
-}
inTransferSlot :: TransferSlotRunner
-inTransferSlot = runTransferSlot (\s -> waitQSemN s 1)
+inTransferSlot dstatus s gen = do
+ waitQSemN s 1
+ runTransferThread dstatus s =<< gen
-{- Runs a transfer action, without waiting for a slot to become available. -}
+{- Runs a TransferGenerator, and its transfer action,
+ - without waiting for a slot to become available. -}
inImmediateTransferSlot :: TransferSlotRunner
-inImmediateTransferSlot = runTransferSlot (\s -> signalQSemN s (-1))
+inImmediateTransferSlot dstatus s gen = do
+ signalQSemN s (-1)
+ runTransferThread dstatus s =<< gen
-{- Note that the action is subject to being killed when the transfer
+{- Runs a transfer action, in an already allocated transfer slot.
+ - Once it finishes, frees the transfer slot.
+ -
+ - Note that the action is subject to being killed when the transfer
- is canceled or paused.
-
- A PauseTransfer exception is handled by letting the action be killed,
- then pausing the thread until a ResumeTransfer exception is raised,
- then rerunning the action.
-}
-runTransferSlot :: (QSemN -> IO ()) -> TransferSlotRunner
-runTransferSlot allocator s transfer = do
- allocator s
- forkIO $ E.bracket_ noop (signalQSemN s 1) go
+runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO ()
+runTransferThread _ s Nothing = signalQSemN s 1
+runTransferThread dstatus s (Just (t, info, a)) = do
+ tid <- forkIO $ E.bracket_ noop (signalQSemN s 1) go
+ updateTransferInfo dstatus t $ info { transferTid = Just tid }
where
- go = catchPauseResume transfer
+ go = catchPauseResume a
pause = catchPauseResume $ runEvery (Seconds 86400) noop
- catchPauseResume a = E.catch a handlePauseResume
+ catchPauseResume a' = E.catch a' handlePauseResume
handlePauseResume PauseTransfer = do
putStrLn "pause"
pause
diff --git a/Assistant/WebApp/DashBoard.hs b/Assistant/WebApp/DashBoard.hs
index 949793121..e51708d64 100644
--- a/Assistant/WebApp/DashBoard.hs
+++ b/Assistant/WebApp/DashBoard.hs
@@ -210,9 +210,10 @@ startTransfer t = do
- forget that old pid, and start a new one. -}
liftIO $ updateTransferInfo dstatus t $ info
{ transferPid = Nothing }
- liftIO $ Transferrer.transferThread
- dstatus slots t info inImmediateTransferSlot
- =<< readProgramFile
+ liftIO $ inImmediateTransferSlot dstatus slots $ do
+ program <- readProgramFile
+ let a = Transferrer.doTransfer dstatus t info program
+ return $ Just (t, info, a)
getCurrentTransfers :: Handler TransferMap
getCurrentTransfers = currentTransfers