diff options
Diffstat (limited to 'Assistant/TransferSlots.hs')
-rw-r--r-- | Assistant/TransferSlots.hs | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 81a778a0a..36d557c3d 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -13,11 +13,27 @@ import Assistant.Types.TransferSlots import Assistant.DaemonStatus import Assistant.TransferrerPool import Assistant.Types.TransferrerPool +import Assistant.Types.TransferQueue +import Assistant.TransferQueue +import Assistant.Alert +import Assistant.Alert.Utility +import Assistant.Commits +import Assistant.Drop import Logs.Transfer +import Logs.Location +import qualified Git +import qualified Remote +import qualified Types.Remote as Remote +import Annex.Content +import Annex.Wanted +import Config.Files +import qualified Data.Map as M import qualified Control.Exception as E import Control.Concurrent import qualified Control.Concurrent.MSemN as MSemN +import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL) +import System.Posix.Process (getProcessGroupIDOf) type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ())) @@ -76,3 +92,186 @@ runTransferThread' program d run = go _ -> done done = runAssistant d $ flip MSemN.signal 1 <<~ transferSlots + +{- By the time this is called, the daemonstatus's currentTransfers map should + - already have been updated to include the transfer. -} +genTransfer :: Transfer -> TransferInfo -> TransferGenerator +genTransfer t info = case (transferRemote info, associatedFile info) of + (Just remote, Just file) + | Git.repoIsLocalUnknown (Remote.repo remote) -> do + -- optimisation for removable drives not plugged in + liftAnnex $ recordFailedTransfer t info + void $ removeTransfer t + return Nothing + | otherwise -> ifM (liftAnnex $ shouldTransfer t info) + ( do + debug [ "Transferring:" , describeTransfer t info ] + notifyTransfer + return $ Just (t, info, go remote file) + , do + debug [ "Skipping unnecessary transfer:", + describeTransfer t info ] + void $ removeTransfer t + finishedTransfer t (Just info) + return Nothing + ) + _ -> return Nothing + where + direction = transferDirection t + isdownload = direction == Download + + {- Alerts are only shown for successful transfers. + - Transfers can temporarily fail for many reasons, + - so there's no point in bothering the user about + - those. The assistant should recover. + - + - After a successful upload, handle dropping it from + - here, if desired. In this case, the remote it was + - uploaded to is known to have it. + - + - Also, after a successful transfer, the location + - log has changed. Indicate that a commit has been + - made, in order to queue a push of the git-annex + - branch out to remotes that did not participate + - in the transfer. + - + - If the process failed, it could have crashed, + - so remove the transfer from the list of current + - transfers, just in case it didn't stop + - in a way that lets the TransferWatcher do its + - usual cleanup. However, first check if something else is + - running the transfer, to avoid removing active transfers. + -} + go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info) + ( do + void $ addAlert $ makeAlertFiller True $ + transferFileAlert direction True file + unless isdownload $ + handleDrops + ("object uploaded to " ++ show remote) + True (transferKey t) + (associatedFile info) + (Just remote) + void recordCommit + , whenM (liftAnnex $ isNothing <$> checkTransfer t) $ + void $ removeTransfer t + ) + +{- Called right before a transfer begins, this is a last chance to avoid + - unnecessary transfers. + - + - For downloads, we obviously don't need to download if the already + - have the object. + - + - Smilarly, for uploads, check if the remote is known to already have + - the object. + - + - Also, uploads get queued to all remotes, in order of cost. + - This may mean, for example, that an object is uploaded over the LAN + - to a locally paired client, and once that upload is done, a more + - expensive transfer remote no longer wants the object. (Since + - all the clients have it already.) So do one last check if this is still + - preferred content. + - + - We'll also do one last preferred content check for downloads. An + - example of a case where this could be needed is if a download is queued + - for a file that gets moved out of an archive directory -- but before + - that download can happen, the file is put back in the archive. + -} +shouldTransfer :: Transfer -> TransferInfo -> Annex Bool +shouldTransfer t info + | transferDirection t == Download = + (not <$> inAnnex key) <&&> wantGet True file + | transferDirection t == Upload = case transferRemote info of + Nothing -> return False + Just r -> notinremote r + <&&> wantSend True file (Remote.uuid r) + | otherwise = return False + where + key = transferKey t + file = associatedFile info + + {- Trust the location log to check if the remote already has + - the key. This avoids a roundtrip to the remote. -} + notinremote r = notElem (Remote.uuid r) <$> loggedLocations key + +{- Queue uploads of files downloaded to us, spreading them + - out to other reachable remotes. + - + - Downloading a file may have caused a remote to not want it; + - so check for drops from remotes. + - + - Uploading a file may cause the local repo, or some other remote to not + - want it; handle that too. + -} +finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () +finishedTransfer t (Just info) + | transferDirection t == Download = + whenM (liftAnnex $ inAnnex $ transferKey t) $ do + dodrops False + queueTransfersMatching (/= transferUUID t) + "newly received object" + Later (transferKey t) (associatedFile info) Upload + | otherwise = dodrops True + where + dodrops fromhere = handleDrops + ("drop wanted after " ++ describeTransfer t info) + fromhere (transferKey t) (associatedFile info) Nothing +finishedTransfer _ _ = noop + +{- Pause a running transfer. -} +pauseTransfer :: Transfer -> Assistant () +pauseTransfer = cancelTransfer True + +{- Cancel a running transfer. -} +cancelTransfer :: Bool -> Transfer -> Assistant () +cancelTransfer pause t = do + m <- getCurrentTransfers + unless pause $ + {- remove queued transfer -} + void $ dequeueTransfers $ equivilantTransfer t + {- stop running transfer -} + maybe noop stop (M.lookup t m) + where + stop info = do + {- When there's a thread associated with the + - transfer, it's signaled first, to avoid it + - displaying any alert about the transfer having + - failed when the transfer process is killed. -} + liftIO $ maybe noop signalthread $ transferTid info + liftIO $ maybe noop killproc $ transferPid info + if pause + then void $ alterTransferInfo t $ + \i -> i { transferPaused = True } + else void $ removeTransfer t + signalthread tid + | pause = throwTo tid PauseTransfer + | otherwise = killThread tid + {- In order to stop helper processes like rsync, + - kill the whole process group of the process running the transfer. -} + killproc pid = void $ tryIO $ do + g <- getProcessGroupIDOf pid + void $ tryIO $ signalProcessGroup sigTERM g + threadDelay 50000 -- 0.05 second grace period + void $ tryIO $ signalProcessGroup sigKILL g + +{- Start or resume a transfer. -} +startTransfer :: Transfer -> Assistant () +startTransfer t = do + m <- getCurrentTransfers + maybe startqueued go (M.lookup t m) + where + go info = maybe (start info) resume $ transferTid info + startqueued = do + is <- map snd <$> getMatchingTransfers (== t) + maybe noop start $ headMaybe is + resume tid = do + alterTransferInfo t $ \i -> i { transferPaused = False } + liftIO $ throwTo tid ResumeTransfer + start info = do + program <- liftIO readProgramFile + inImmediateTransferSlot program $ + genTransfer t info + +getCurrentTransfers :: Assistant TransferMap +getCurrentTransfers = currentTransfers <$> getDaemonStatus |