summaryrefslogtreecommitdiff
path: root/Assistant/TransferSlots.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/TransferSlots.hs')
-rw-r--r--Assistant/TransferSlots.hs199
1 files changed, 199 insertions, 0 deletions
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
index 81a778a0a..36d557c3d 100644
--- a/Assistant/TransferSlots.hs
+++ b/Assistant/TransferSlots.hs
@@ -13,11 +13,27 @@ import Assistant.Types.TransferSlots
import Assistant.DaemonStatus
import Assistant.TransferrerPool
import Assistant.Types.TransferrerPool
+import Assistant.Types.TransferQueue
+import Assistant.TransferQueue
+import Assistant.Alert
+import Assistant.Alert.Utility
+import Assistant.Commits
+import Assistant.Drop
import Logs.Transfer
+import Logs.Location
+import qualified Git
+import qualified Remote
+import qualified Types.Remote as Remote
+import Annex.Content
+import Annex.Wanted
+import Config.Files
+import qualified Data.Map as M
import qualified Control.Exception as E
import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
+import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL)
+import System.Posix.Process (getProcessGroupIDOf)
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
@@ -76,3 +92,186 @@ runTransferThread' program d run = go
_ -> done
done = runAssistant d $
flip MSemN.signal 1 <<~ transferSlots
+
+{- By the time this is called, the daemonstatus's currentTransfers map should
+ - already have been updated to include the transfer. -}
+genTransfer :: Transfer -> TransferInfo -> TransferGenerator
+genTransfer t info = case (transferRemote info, associatedFile info) of
+ (Just remote, Just file)
+ | Git.repoIsLocalUnknown (Remote.repo remote) -> do
+ -- optimisation for removable drives not plugged in
+ liftAnnex $ recordFailedTransfer t info
+ void $ removeTransfer t
+ return Nothing
+ | otherwise -> ifM (liftAnnex $ shouldTransfer t info)
+ ( do
+ debug [ "Transferring:" , describeTransfer t info ]
+ notifyTransfer
+ return $ Just (t, info, go remote file)
+ , do
+ debug [ "Skipping unnecessary transfer:",
+ describeTransfer t info ]
+ void $ removeTransfer t
+ finishedTransfer t (Just info)
+ return Nothing
+ )
+ _ -> return Nothing
+ where
+ direction = transferDirection t
+ isdownload = direction == Download
+
+ {- Alerts are only shown for successful transfers.
+ - Transfers can temporarily fail for many reasons,
+ - so there's no point in bothering the user about
+ - those. The assistant should recover.
+ -
+ - After a successful upload, handle dropping it from
+ - here, if desired. In this case, the remote it was
+ - uploaded to is known to have it.
+ -
+ - Also, after a successful transfer, the location
+ - log has changed. Indicate that a commit has been
+ - made, in order to queue a push of the git-annex
+ - branch out to remotes that did not participate
+ - in the transfer.
+ -
+ - If the process failed, it could have crashed,
+ - so remove the transfer from the list of current
+ - transfers, just in case it didn't stop
+ - in a way that lets the TransferWatcher do its
+ - usual cleanup. However, first check if something else is
+ - running the transfer, to avoid removing active transfers.
+ -}
+ go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
+ ( do
+ void $ addAlert $ makeAlertFiller True $
+ transferFileAlert direction True file
+ unless isdownload $
+ handleDrops
+ ("object uploaded to " ++ show remote)
+ True (transferKey t)
+ (associatedFile info)
+ (Just remote)
+ void recordCommit
+ , whenM (liftAnnex $ isNothing <$> checkTransfer t) $
+ void $ removeTransfer t
+ )
+
+{- Called right before a transfer begins, this is a last chance to avoid
+ - unnecessary transfers.
+ -
+ - For downloads, we obviously don't need to download if the already
+ - have the object.
+ -
+ - Smilarly, for uploads, check if the remote is known to already have
+ - the object.
+ -
+ - Also, uploads get queued to all remotes, in order of cost.
+ - This may mean, for example, that an object is uploaded over the LAN
+ - to a locally paired client, and once that upload is done, a more
+ - expensive transfer remote no longer wants the object. (Since
+ - all the clients have it already.) So do one last check if this is still
+ - preferred content.
+ -
+ - We'll also do one last preferred content check for downloads. An
+ - example of a case where this could be needed is if a download is queued
+ - for a file that gets moved out of an archive directory -- but before
+ - that download can happen, the file is put back in the archive.
+ -}
+shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
+shouldTransfer t info
+ | transferDirection t == Download =
+ (not <$> inAnnex key) <&&> wantGet True file
+ | transferDirection t == Upload = case transferRemote info of
+ Nothing -> return False
+ Just r -> notinremote r
+ <&&> wantSend True file (Remote.uuid r)
+ | otherwise = return False
+ where
+ key = transferKey t
+ file = associatedFile info
+
+ {- Trust the location log to check if the remote already has
+ - the key. This avoids a roundtrip to the remote. -}
+ notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
+
+{- Queue uploads of files downloaded to us, spreading them
+ - out to other reachable remotes.
+ -
+ - Downloading a file may have caused a remote to not want it;
+ - so check for drops from remotes.
+ -
+ - Uploading a file may cause the local repo, or some other remote to not
+ - want it; handle that too.
+ -}
+finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
+finishedTransfer t (Just info)
+ | transferDirection t == Download =
+ whenM (liftAnnex $ inAnnex $ transferKey t) $ do
+ dodrops False
+ queueTransfersMatching (/= transferUUID t)
+ "newly received object"
+ Later (transferKey t) (associatedFile info) Upload
+ | otherwise = dodrops True
+ where
+ dodrops fromhere = handleDrops
+ ("drop wanted after " ++ describeTransfer t info)
+ fromhere (transferKey t) (associatedFile info) Nothing
+finishedTransfer _ _ = noop
+
+{- Pause a running transfer. -}
+pauseTransfer :: Transfer -> Assistant ()
+pauseTransfer = cancelTransfer True
+
+{- Cancel a running transfer. -}
+cancelTransfer :: Bool -> Transfer -> Assistant ()
+cancelTransfer pause t = do
+ m <- getCurrentTransfers
+ unless pause $
+ {- remove queued transfer -}
+ void $ dequeueTransfers $ equivilantTransfer t
+ {- stop running transfer -}
+ maybe noop stop (M.lookup t m)
+ where
+ stop info = do
+ {- When there's a thread associated with the
+ - transfer, it's signaled first, to avoid it
+ - displaying any alert about the transfer having
+ - failed when the transfer process is killed. -}
+ liftIO $ maybe noop signalthread $ transferTid info
+ liftIO $ maybe noop killproc $ transferPid info
+ if pause
+ then void $ alterTransferInfo t $
+ \i -> i { transferPaused = True }
+ else void $ removeTransfer t
+ signalthread tid
+ | pause = throwTo tid PauseTransfer
+ | otherwise = killThread tid
+ {- In order to stop helper processes like rsync,
+ - kill the whole process group of the process running the transfer. -}
+ killproc pid = void $ tryIO $ do
+ g <- getProcessGroupIDOf pid
+ void $ tryIO $ signalProcessGroup sigTERM g
+ threadDelay 50000 -- 0.05 second grace period
+ void $ tryIO $ signalProcessGroup sigKILL g
+
+{- Start or resume a transfer. -}
+startTransfer :: Transfer -> Assistant ()
+startTransfer t = do
+ m <- getCurrentTransfers
+ maybe startqueued go (M.lookup t m)
+ where
+ go info = maybe (start info) resume $ transferTid info
+ startqueued = do
+ is <- map snd <$> getMatchingTransfers (== t)
+ maybe noop start $ headMaybe is
+ resume tid = do
+ alterTransferInfo t $ \i -> i { transferPaused = False }
+ liftIO $ throwTo tid ResumeTransfer
+ start info = do
+ program <- liftIO readProgramFile
+ inImmediateTransferSlot program $
+ genTransfer t info
+
+getCurrentTransfers :: Assistant TransferMap
+getCurrentTransfers = currentTransfers <$> getDaemonStatus