diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Sync.hs | 41 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 29 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 102 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 8 | ||||
-rw-r--r-- | Assistant/TransferSlots.hs | 199 |
5 files changed, 245 insertions, 134 deletions
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 43f0309fe..6a66802d5 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -23,9 +23,17 @@ import qualified Git.Command import qualified Git.Ref import qualified Remote import qualified Types.Remote as Remote +import qualified Remote.List as Remote import qualified Annex.Branch import Annex.UUID import Annex.TaggedPush +import qualified Config +import Git.Config +import Assistant.NamedThread +import Assistant.Threads.Watcher (watchThread, WatcherControl(..)) +import Assistant.TransferSlots +import Assistant.TransferQueue +import Logs.Transfer import Data.Time.Clock import qualified Data.Map as M @@ -233,3 +241,36 @@ syncRemote remote = do reconnectRemotes False [remote] addScanRemotes True [remote] void $ liftIO $ forkIO $ thread + +{- Use Nothing to change autocommit setting; or a remote to change + - its sync setting. -} +changeSyncable :: Maybe Remote -> Bool -> Assistant () +changeSyncable Nothing enable = do + liftAnnex $ Config.setConfig key (boolConfig enable) + liftIO . maybe noop (`throwTo` signal) + =<< namedThreadId watchThread + where + key = Config.annexConfig "autocommit" + signal + | enable = ResumeWatcher + | otherwise = PauseWatcher +changeSyncable (Just r) True = do + liftAnnex $ changeSyncFlag r True + syncRemote r +changeSyncable (Just r) False = do + liftAnnex $ changeSyncFlag r False + updateSyncRemotes + {- Stop all transfers to or from this remote. + - XXX Can't stop any ongoing scan, or git syncs. -} + void $ dequeueTransfers tofrom + mapM_ (cancelTransfer False) =<< + filter tofrom . M.keys . currentTransfers <$> getDaemonStatus + where + tofrom t = transferUUID t == Remote.uuid r + +changeSyncFlag :: Remote -> Bool -> Annex () +changeSyncFlag r enabled = do + Config.setConfig key (boolConfig enabled) + void Remote.remoteListRefresh + where + key = Config.remoteConfig (Remote.repo r) "sync" diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 9bc851d4e..fc09373e7 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -9,9 +9,7 @@ module Assistant.Threads.TransferWatcher where import Assistant.Common import Assistant.DaemonStatus -import Assistant.TransferQueue -import Assistant.Drop -import Annex.Content +import Assistant.TransferSlots import Logs.Transfer import Utility.DirWatcher import Utility.DirWatcher.Types @@ -98,28 +96,3 @@ onDel file = case parseTransferFile file of - runs. -} threadDelay 10000000 -- 10 seconds finished t minfo - -{- Queue uploads of files downloaded to us, spreading them - - out to other reachable remotes. - - - - Downloading a file may have caused a remote to not want it; - - so check for drops from remotes. - - - - Uploading a file may cause the local repo, or some other remote to not - - want it; handle that too. - -} -finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () -finishedTransfer t (Just info) - | transferDirection t == Download = - whenM (liftAnnex $ inAnnex $ transferKey t) $ do - dodrops False - queueTransfersMatching (/= transferUUID t) - "newly received object" - Later (transferKey t) (associatedFile info) Upload - | otherwise = dodrops True - where - dodrops fromhere = handleDrops - ("drop wanted after " ++ describeTransfer t info) - fromhere (transferKey t) (associatedFile info) Nothing -finishedTransfer _ _ = noop - diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 98f8b6ad7..82f3f3e10 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -36,105 +36,3 @@ transfererThread = namedThread "Transferrer" $ do where {- Skip transfers that are already running. -} notrunning = isNothing . startedTime - -{- By the time this is called, the daemonstatus's currentTransfers map should - - already have been updated to include the transfer. -} -genTransfer :: Transfer -> TransferInfo -> TransferGenerator -genTransfer t info = case (transferRemote info, associatedFile info) of - (Just remote, Just file) - | Git.repoIsLocalUnknown (Remote.repo remote) -> do - -- optimisation for removable drives not plugged in - liftAnnex $ recordFailedTransfer t info - void $ removeTransfer t - return Nothing - | otherwise -> ifM (liftAnnex $ shouldTransfer t info) - ( do - debug [ "Transferring:" , describeTransfer t info ] - notifyTransfer - return $ Just (t, info, go remote file) - , do - debug [ "Skipping unnecessary transfer:", - describeTransfer t info ] - void $ removeTransfer t - finishedTransfer t (Just info) - return Nothing - ) - _ -> return Nothing - where - direction = transferDirection t - isdownload = direction == Download - - {- Alerts are only shown for successful transfers. - - Transfers can temporarily fail for many reasons, - - so there's no point in bothering the user about - - those. The assistant should recover. - - - - After a successful upload, handle dropping it from - - here, if desired. In this case, the remote it was - - uploaded to is known to have it. - - - - Also, after a successful transfer, the location - - log has changed. Indicate that a commit has been - - made, in order to queue a push of the git-annex - - branch out to remotes that did not participate - - in the transfer. - - - - If the process failed, it could have crashed, - - so remove the transfer from the list of current - - transfers, just in case it didn't stop - - in a way that lets the TransferWatcher do its - - usual cleanup. However, first check if something else is - - running the transfer, to avoid removing active transfers. - -} - go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info) - ( do - void $ addAlert $ makeAlertFiller True $ - transferFileAlert direction True file - unless isdownload $ - handleDrops - ("object uploaded to " ++ show remote) - True (transferKey t) - (associatedFile info) - (Just remote) - void recordCommit - , whenM (liftAnnex $ isNothing <$> checkTransfer t) $ - void $ removeTransfer t - ) - -{- Called right before a transfer begins, this is a last chance to avoid - - unnecessary transfers. - - - - For downloads, we obviously don't need to download if the already - - have the object. - - - - Smilarly, for uploads, check if the remote is known to already have - - the object. - - - - Also, uploads get queued to all remotes, in order of cost. - - This may mean, for example, that an object is uploaded over the LAN - - to a locally paired client, and once that upload is done, a more - - expensive transfer remote no longer wants the object. (Since - - all the clients have it already.) So do one last check if this is still - - preferred content. - - - - We'll also do one last preferred content check for downloads. An - - example of a case where this could be needed is if a download is queued - - for a file that gets moved out of an archive directory -- but before - - that download can happen, the file is put back in the archive. - -} -shouldTransfer :: Transfer -> TransferInfo -> Annex Bool -shouldTransfer t info - | transferDirection t == Download = - (not <$> inAnnex key) <&&> wantGet True file - | transferDirection t == Upload = case transferRemote info of - Nothing -> return False - Just r -> notinremote r - <&&> wantSend True file (Remote.uuid r) - | otherwise = return False - where - key = transferKey t - file = associatedFile info - - {- Trust the location log to check if the remote already has - - the key. This avoids a roundtrip to the remote. -} - notinremote r = notElem (Remote.uuid r) <$> loggedLocations key diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index a44664639..3eedbe145 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -9,7 +9,7 @@ module Assistant.Threads.Watcher ( watchThread, - WatcherException(..), + WatcherControl(..), checkCanWatch, needLsof, onAddSymlink, @@ -64,10 +64,10 @@ needLsof = error $ unlines ] {- A special exception that can be thrown to pause or resume the watcher. -} -data WatcherException = PauseWatcher | ResumeWatcher +data WatcherControl = PauseWatcher | ResumeWatcher deriving (Show, Eq, Typeable) -instance E.Exception WatcherException +instance E.Exception WatcherControl watchThread :: NamedThread watchThread = namedThread "Watcher" $ @@ -107,7 +107,7 @@ runWatcher = do where hook a = Just <$> asIO2 (runHandler a) -waitFor :: WatcherException -> Assistant () -> Assistant () +waitFor :: WatcherControl -> Assistant () -> Assistant () waitFor sig next = do r <- liftIO (E.try pause :: IO (Either E.SomeException ())) case r of diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 81a778a0a..36d557c3d 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -13,11 +13,27 @@ import Assistant.Types.TransferSlots import Assistant.DaemonStatus import Assistant.TransferrerPool import Assistant.Types.TransferrerPool +import Assistant.Types.TransferQueue +import Assistant.TransferQueue +import Assistant.Alert +import Assistant.Alert.Utility +import Assistant.Commits +import Assistant.Drop import Logs.Transfer +import Logs.Location +import qualified Git +import qualified Remote +import qualified Types.Remote as Remote +import Annex.Content +import Annex.Wanted +import Config.Files +import qualified Data.Map as M import qualified Control.Exception as E import Control.Concurrent import qualified Control.Concurrent.MSemN as MSemN +import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL) +import System.Posix.Process (getProcessGroupIDOf) type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ())) @@ -76,3 +92,186 @@ runTransferThread' program d run = go _ -> done done = runAssistant d $ flip MSemN.signal 1 <<~ transferSlots + +{- By the time this is called, the daemonstatus's currentTransfers map should + - already have been updated to include the transfer. -} +genTransfer :: Transfer -> TransferInfo -> TransferGenerator +genTransfer t info = case (transferRemote info, associatedFile info) of + (Just remote, Just file) + | Git.repoIsLocalUnknown (Remote.repo remote) -> do + -- optimisation for removable drives not plugged in + liftAnnex $ recordFailedTransfer t info + void $ removeTransfer t + return Nothing + | otherwise -> ifM (liftAnnex $ shouldTransfer t info) + ( do + debug [ "Transferring:" , describeTransfer t info ] + notifyTransfer + return $ Just (t, info, go remote file) + , do + debug [ "Skipping unnecessary transfer:", + describeTransfer t info ] + void $ removeTransfer t + finishedTransfer t (Just info) + return Nothing + ) + _ -> return Nothing + where + direction = transferDirection t + isdownload = direction == Download + + {- Alerts are only shown for successful transfers. + - Transfers can temporarily fail for many reasons, + - so there's no point in bothering the user about + - those. The assistant should recover. + - + - After a successful upload, handle dropping it from + - here, if desired. In this case, the remote it was + - uploaded to is known to have it. + - + - Also, after a successful transfer, the location + - log has changed. Indicate that a commit has been + - made, in order to queue a push of the git-annex + - branch out to remotes that did not participate + - in the transfer. + - + - If the process failed, it could have crashed, + - so remove the transfer from the list of current + - transfers, just in case it didn't stop + - in a way that lets the TransferWatcher do its + - usual cleanup. However, first check if something else is + - running the transfer, to avoid removing active transfers. + -} + go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info) + ( do + void $ addAlert $ makeAlertFiller True $ + transferFileAlert direction True file + unless isdownload $ + handleDrops + ("object uploaded to " ++ show remote) + True (transferKey t) + (associatedFile info) + (Just remote) + void recordCommit + , whenM (liftAnnex $ isNothing <$> checkTransfer t) $ + void $ removeTransfer t + ) + +{- Called right before a transfer begins, this is a last chance to avoid + - unnecessary transfers. + - + - For downloads, we obviously don't need to download if the already + - have the object. + - + - Smilarly, for uploads, check if the remote is known to already have + - the object. + - + - Also, uploads get queued to all remotes, in order of cost. + - This may mean, for example, that an object is uploaded over the LAN + - to a locally paired client, and once that upload is done, a more + - expensive transfer remote no longer wants the object. (Since + - all the clients have it already.) So do one last check if this is still + - preferred content. + - + - We'll also do one last preferred content check for downloads. An + - example of a case where this could be needed is if a download is queued + - for a file that gets moved out of an archive directory -- but before + - that download can happen, the file is put back in the archive. + -} +shouldTransfer :: Transfer -> TransferInfo -> Annex Bool +shouldTransfer t info + | transferDirection t == Download = + (not <$> inAnnex key) <&&> wantGet True file + | transferDirection t == Upload = case transferRemote info of + Nothing -> return False + Just r -> notinremote r + <&&> wantSend True file (Remote.uuid r) + | otherwise = return False + where + key = transferKey t + file = associatedFile info + + {- Trust the location log to check if the remote already has + - the key. This avoids a roundtrip to the remote. -} + notinremote r = notElem (Remote.uuid r) <$> loggedLocations key + +{- Queue uploads of files downloaded to us, spreading them + - out to other reachable remotes. + - + - Downloading a file may have caused a remote to not want it; + - so check for drops from remotes. + - + - Uploading a file may cause the local repo, or some other remote to not + - want it; handle that too. + -} +finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () +finishedTransfer t (Just info) + | transferDirection t == Download = + whenM (liftAnnex $ inAnnex $ transferKey t) $ do + dodrops False + queueTransfersMatching (/= transferUUID t) + "newly received object" + Later (transferKey t) (associatedFile info) Upload + | otherwise = dodrops True + where + dodrops fromhere = handleDrops + ("drop wanted after " ++ describeTransfer t info) + fromhere (transferKey t) (associatedFile info) Nothing +finishedTransfer _ _ = noop + +{- Pause a running transfer. -} +pauseTransfer :: Transfer -> Assistant () +pauseTransfer = cancelTransfer True + +{- Cancel a running transfer. -} +cancelTransfer :: Bool -> Transfer -> Assistant () +cancelTransfer pause t = do + m <- getCurrentTransfers + unless pause $ + {- remove queued transfer -} + void $ dequeueTransfers $ equivilantTransfer t + {- stop running transfer -} + maybe noop stop (M.lookup t m) + where + stop info = do + {- When there's a thread associated with the + - transfer, it's signaled first, to avoid it + - displaying any alert about the transfer having + - failed when the transfer process is killed. -} + liftIO $ maybe noop signalthread $ transferTid info + liftIO $ maybe noop killproc $ transferPid info + if pause + then void $ alterTransferInfo t $ + \i -> i { transferPaused = True } + else void $ removeTransfer t + signalthread tid + | pause = throwTo tid PauseTransfer + | otherwise = killThread tid + {- In order to stop helper processes like rsync, + - kill the whole process group of the process running the transfer. -} + killproc pid = void $ tryIO $ do + g <- getProcessGroupIDOf pid + void $ tryIO $ signalProcessGroup sigTERM g + threadDelay 50000 -- 0.05 second grace period + void $ tryIO $ signalProcessGroup sigKILL g + +{- Start or resume a transfer. -} +startTransfer :: Transfer -> Assistant () +startTransfer t = do + m <- getCurrentTransfers + maybe startqueued go (M.lookup t m) + where + go info = maybe (start info) resume $ transferTid info + startqueued = do + is <- map snd <$> getMatchingTransfers (== t) + maybe noop start $ headMaybe is + resume tid = do + alterTransferInfo t $ \i -> i { transferPaused = False } + liftIO $ throwTo tid ResumeTransfer + start info = do + program <- liftIO readProgramFile + inImmediateTransferSlot program $ + genTransfer t info + +getCurrentTransfers :: Assistant TransferMap +getCurrentTransfers = currentTransfers <$> getDaemonStatus |