summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/Sync.hs41
-rw-r--r--Assistant/Threads/TransferWatcher.hs29
-rw-r--r--Assistant/Threads/Transferrer.hs102
-rw-r--r--Assistant/Threads/Watcher.hs8
-rw-r--r--Assistant/TransferSlots.hs199
5 files changed, 245 insertions, 134 deletions
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index 43f0309fe..6a66802d5 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -23,9 +23,17 @@ import qualified Git.Command
import qualified Git.Ref
import qualified Remote
import qualified Types.Remote as Remote
+import qualified Remote.List as Remote
import qualified Annex.Branch
import Annex.UUID
import Annex.TaggedPush
+import qualified Config
+import Git.Config
+import Assistant.NamedThread
+import Assistant.Threads.Watcher (watchThread, WatcherControl(..))
+import Assistant.TransferSlots
+import Assistant.TransferQueue
+import Logs.Transfer
import Data.Time.Clock
import qualified Data.Map as M
@@ -233,3 +241,36 @@ syncRemote remote = do
reconnectRemotes False [remote]
addScanRemotes True [remote]
void $ liftIO $ forkIO $ thread
+
+{- Use Nothing to change autocommit setting; or a remote to change
+ - its sync setting. -}
+changeSyncable :: Maybe Remote -> Bool -> Assistant ()
+changeSyncable Nothing enable = do
+ liftAnnex $ Config.setConfig key (boolConfig enable)
+ liftIO . maybe noop (`throwTo` signal)
+ =<< namedThreadId watchThread
+ where
+ key = Config.annexConfig "autocommit"
+ signal
+ | enable = ResumeWatcher
+ | otherwise = PauseWatcher
+changeSyncable (Just r) True = do
+ liftAnnex $ changeSyncFlag r True
+ syncRemote r
+changeSyncable (Just r) False = do
+ liftAnnex $ changeSyncFlag r False
+ updateSyncRemotes
+ {- Stop all transfers to or from this remote.
+ - XXX Can't stop any ongoing scan, or git syncs. -}
+ void $ dequeueTransfers tofrom
+ mapM_ (cancelTransfer False) =<<
+ filter tofrom . M.keys . currentTransfers <$> getDaemonStatus
+ where
+ tofrom t = transferUUID t == Remote.uuid r
+
+changeSyncFlag :: Remote -> Bool -> Annex ()
+changeSyncFlag r enabled = do
+ Config.setConfig key (boolConfig enabled)
+ void Remote.remoteListRefresh
+ where
+ key = Config.remoteConfig (Remote.repo r) "sync"
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
index 9bc851d4e..fc09373e7 100644
--- a/Assistant/Threads/TransferWatcher.hs
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -9,9 +9,7 @@ module Assistant.Threads.TransferWatcher where
import Assistant.Common
import Assistant.DaemonStatus
-import Assistant.TransferQueue
-import Assistant.Drop
-import Annex.Content
+import Assistant.TransferSlots
import Logs.Transfer
import Utility.DirWatcher
import Utility.DirWatcher.Types
@@ -98,28 +96,3 @@ onDel file = case parseTransferFile file of
- runs. -}
threadDelay 10000000 -- 10 seconds
finished t minfo
-
-{- Queue uploads of files downloaded to us, spreading them
- - out to other reachable remotes.
- -
- - Downloading a file may have caused a remote to not want it;
- - so check for drops from remotes.
- -
- - Uploading a file may cause the local repo, or some other remote to not
- - want it; handle that too.
- -}
-finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
-finishedTransfer t (Just info)
- | transferDirection t == Download =
- whenM (liftAnnex $ inAnnex $ transferKey t) $ do
- dodrops False
- queueTransfersMatching (/= transferUUID t)
- "newly received object"
- Later (transferKey t) (associatedFile info) Upload
- | otherwise = dodrops True
- where
- dodrops fromhere = handleDrops
- ("drop wanted after " ++ describeTransfer t info)
- fromhere (transferKey t) (associatedFile info) Nothing
-finishedTransfer _ _ = noop
-
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 98f8b6ad7..82f3f3e10 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -36,105 +36,3 @@ transfererThread = namedThread "Transferrer" $ do
where
{- Skip transfers that are already running. -}
notrunning = isNothing . startedTime
-
-{- By the time this is called, the daemonstatus's currentTransfers map should
- - already have been updated to include the transfer. -}
-genTransfer :: Transfer -> TransferInfo -> TransferGenerator
-genTransfer t info = case (transferRemote info, associatedFile info) of
- (Just remote, Just file)
- | Git.repoIsLocalUnknown (Remote.repo remote) -> do
- -- optimisation for removable drives not plugged in
- liftAnnex $ recordFailedTransfer t info
- void $ removeTransfer t
- return Nothing
- | otherwise -> ifM (liftAnnex $ shouldTransfer t info)
- ( do
- debug [ "Transferring:" , describeTransfer t info ]
- notifyTransfer
- return $ Just (t, info, go remote file)
- , do
- debug [ "Skipping unnecessary transfer:",
- describeTransfer t info ]
- void $ removeTransfer t
- finishedTransfer t (Just info)
- return Nothing
- )
- _ -> return Nothing
- where
- direction = transferDirection t
- isdownload = direction == Download
-
- {- Alerts are only shown for successful transfers.
- - Transfers can temporarily fail for many reasons,
- - so there's no point in bothering the user about
- - those. The assistant should recover.
- -
- - After a successful upload, handle dropping it from
- - here, if desired. In this case, the remote it was
- - uploaded to is known to have it.
- -
- - Also, after a successful transfer, the location
- - log has changed. Indicate that a commit has been
- - made, in order to queue a push of the git-annex
- - branch out to remotes that did not participate
- - in the transfer.
- -
- - If the process failed, it could have crashed,
- - so remove the transfer from the list of current
- - transfers, just in case it didn't stop
- - in a way that lets the TransferWatcher do its
- - usual cleanup. However, first check if something else is
- - running the transfer, to avoid removing active transfers.
- -}
- go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
- ( do
- void $ addAlert $ makeAlertFiller True $
- transferFileAlert direction True file
- unless isdownload $
- handleDrops
- ("object uploaded to " ++ show remote)
- True (transferKey t)
- (associatedFile info)
- (Just remote)
- void recordCommit
- , whenM (liftAnnex $ isNothing <$> checkTransfer t) $
- void $ removeTransfer t
- )
-
-{- Called right before a transfer begins, this is a last chance to avoid
- - unnecessary transfers.
- -
- - For downloads, we obviously don't need to download if the already
- - have the object.
- -
- - Smilarly, for uploads, check if the remote is known to already have
- - the object.
- -
- - Also, uploads get queued to all remotes, in order of cost.
- - This may mean, for example, that an object is uploaded over the LAN
- - to a locally paired client, and once that upload is done, a more
- - expensive transfer remote no longer wants the object. (Since
- - all the clients have it already.) So do one last check if this is still
- - preferred content.
- -
- - We'll also do one last preferred content check for downloads. An
- - example of a case where this could be needed is if a download is queued
- - for a file that gets moved out of an archive directory -- but before
- - that download can happen, the file is put back in the archive.
- -}
-shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
-shouldTransfer t info
- | transferDirection t == Download =
- (not <$> inAnnex key) <&&> wantGet True file
- | transferDirection t == Upload = case transferRemote info of
- Nothing -> return False
- Just r -> notinremote r
- <&&> wantSend True file (Remote.uuid r)
- | otherwise = return False
- where
- key = transferKey t
- file = associatedFile info
-
- {- Trust the location log to check if the remote already has
- - the key. This avoids a roundtrip to the remote. -}
- notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index a44664639..3eedbe145 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -9,7 +9,7 @@
module Assistant.Threads.Watcher (
watchThread,
- WatcherException(..),
+ WatcherControl(..),
checkCanWatch,
needLsof,
onAddSymlink,
@@ -64,10 +64,10 @@ needLsof = error $ unlines
]
{- A special exception that can be thrown to pause or resume the watcher. -}
-data WatcherException = PauseWatcher | ResumeWatcher
+data WatcherControl = PauseWatcher | ResumeWatcher
deriving (Show, Eq, Typeable)
-instance E.Exception WatcherException
+instance E.Exception WatcherControl
watchThread :: NamedThread
watchThread = namedThread "Watcher" $
@@ -107,7 +107,7 @@ runWatcher = do
where
hook a = Just <$> asIO2 (runHandler a)
-waitFor :: WatcherException -> Assistant () -> Assistant ()
+waitFor :: WatcherControl -> Assistant () -> Assistant ()
waitFor sig next = do
r <- liftIO (E.try pause :: IO (Either E.SomeException ()))
case r of
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
index 81a778a0a..36d557c3d 100644
--- a/Assistant/TransferSlots.hs
+++ b/Assistant/TransferSlots.hs
@@ -13,11 +13,27 @@ import Assistant.Types.TransferSlots
import Assistant.DaemonStatus
import Assistant.TransferrerPool
import Assistant.Types.TransferrerPool
+import Assistant.Types.TransferQueue
+import Assistant.TransferQueue
+import Assistant.Alert
+import Assistant.Alert.Utility
+import Assistant.Commits
+import Assistant.Drop
import Logs.Transfer
+import Logs.Location
+import qualified Git
+import qualified Remote
+import qualified Types.Remote as Remote
+import Annex.Content
+import Annex.Wanted
+import Config.Files
+import qualified Data.Map as M
import qualified Control.Exception as E
import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
+import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL)
+import System.Posix.Process (getProcessGroupIDOf)
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
@@ -76,3 +92,186 @@ runTransferThread' program d run = go
_ -> done
done = runAssistant d $
flip MSemN.signal 1 <<~ transferSlots
+
+{- By the time this is called, the daemonstatus's currentTransfers map should
+ - already have been updated to include the transfer. -}
+genTransfer :: Transfer -> TransferInfo -> TransferGenerator
+genTransfer t info = case (transferRemote info, associatedFile info) of
+ (Just remote, Just file)
+ | Git.repoIsLocalUnknown (Remote.repo remote) -> do
+ -- optimisation for removable drives not plugged in
+ liftAnnex $ recordFailedTransfer t info
+ void $ removeTransfer t
+ return Nothing
+ | otherwise -> ifM (liftAnnex $ shouldTransfer t info)
+ ( do
+ debug [ "Transferring:" , describeTransfer t info ]
+ notifyTransfer
+ return $ Just (t, info, go remote file)
+ , do
+ debug [ "Skipping unnecessary transfer:",
+ describeTransfer t info ]
+ void $ removeTransfer t
+ finishedTransfer t (Just info)
+ return Nothing
+ )
+ _ -> return Nothing
+ where
+ direction = transferDirection t
+ isdownload = direction == Download
+
+ {- Alerts are only shown for successful transfers.
+ - Transfers can temporarily fail for many reasons,
+ - so there's no point in bothering the user about
+ - those. The assistant should recover.
+ -
+ - After a successful upload, handle dropping it from
+ - here, if desired. In this case, the remote it was
+ - uploaded to is known to have it.
+ -
+ - Also, after a successful transfer, the location
+ - log has changed. Indicate that a commit has been
+ - made, in order to queue a push of the git-annex
+ - branch out to remotes that did not participate
+ - in the transfer.
+ -
+ - If the process failed, it could have crashed,
+ - so remove the transfer from the list of current
+ - transfers, just in case it didn't stop
+ - in a way that lets the TransferWatcher do its
+ - usual cleanup. However, first check if something else is
+ - running the transfer, to avoid removing active transfers.
+ -}
+ go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
+ ( do
+ void $ addAlert $ makeAlertFiller True $
+ transferFileAlert direction True file
+ unless isdownload $
+ handleDrops
+ ("object uploaded to " ++ show remote)
+ True (transferKey t)
+ (associatedFile info)
+ (Just remote)
+ void recordCommit
+ , whenM (liftAnnex $ isNothing <$> checkTransfer t) $
+ void $ removeTransfer t
+ )
+
+{- Called right before a transfer begins, this is a last chance to avoid
+ - unnecessary transfers.
+ -
+ - For downloads, we obviously don't need to download if the already
+ - have the object.
+ -
+ - Smilarly, for uploads, check if the remote is known to already have
+ - the object.
+ -
+ - Also, uploads get queued to all remotes, in order of cost.
+ - This may mean, for example, that an object is uploaded over the LAN
+ - to a locally paired client, and once that upload is done, a more
+ - expensive transfer remote no longer wants the object. (Since
+ - all the clients have it already.) So do one last check if this is still
+ - preferred content.
+ -
+ - We'll also do one last preferred content check for downloads. An
+ - example of a case where this could be needed is if a download is queued
+ - for a file that gets moved out of an archive directory -- but before
+ - that download can happen, the file is put back in the archive.
+ -}
+shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
+shouldTransfer t info
+ | transferDirection t == Download =
+ (not <$> inAnnex key) <&&> wantGet True file
+ | transferDirection t == Upload = case transferRemote info of
+ Nothing -> return False
+ Just r -> notinremote r
+ <&&> wantSend True file (Remote.uuid r)
+ | otherwise = return False
+ where
+ key = transferKey t
+ file = associatedFile info
+
+ {- Trust the location log to check if the remote already has
+ - the key. This avoids a roundtrip to the remote. -}
+ notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
+
+{- Queue uploads of files downloaded to us, spreading them
+ - out to other reachable remotes.
+ -
+ - Downloading a file may have caused a remote to not want it;
+ - so check for drops from remotes.
+ -
+ - Uploading a file may cause the local repo, or some other remote to not
+ - want it; handle that too.
+ -}
+finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
+finishedTransfer t (Just info)
+ | transferDirection t == Download =
+ whenM (liftAnnex $ inAnnex $ transferKey t) $ do
+ dodrops False
+ queueTransfersMatching (/= transferUUID t)
+ "newly received object"
+ Later (transferKey t) (associatedFile info) Upload
+ | otherwise = dodrops True
+ where
+ dodrops fromhere = handleDrops
+ ("drop wanted after " ++ describeTransfer t info)
+ fromhere (transferKey t) (associatedFile info) Nothing
+finishedTransfer _ _ = noop
+
+{- Pause a running transfer. -}
+pauseTransfer :: Transfer -> Assistant ()
+pauseTransfer = cancelTransfer True
+
+{- Cancel a running transfer. -}
+cancelTransfer :: Bool -> Transfer -> Assistant ()
+cancelTransfer pause t = do
+ m <- getCurrentTransfers
+ unless pause $
+ {- remove queued transfer -}
+ void $ dequeueTransfers $ equivilantTransfer t
+ {- stop running transfer -}
+ maybe noop stop (M.lookup t m)
+ where
+ stop info = do
+ {- When there's a thread associated with the
+ - transfer, it's signaled first, to avoid it
+ - displaying any alert about the transfer having
+ - failed when the transfer process is killed. -}
+ liftIO $ maybe noop signalthread $ transferTid info
+ liftIO $ maybe noop killproc $ transferPid info
+ if pause
+ then void $ alterTransferInfo t $
+ \i -> i { transferPaused = True }
+ else void $ removeTransfer t
+ signalthread tid
+ | pause = throwTo tid PauseTransfer
+ | otherwise = killThread tid
+ {- In order to stop helper processes like rsync,
+ - kill the whole process group of the process running the transfer. -}
+ killproc pid = void $ tryIO $ do
+ g <- getProcessGroupIDOf pid
+ void $ tryIO $ signalProcessGroup sigTERM g
+ threadDelay 50000 -- 0.05 second grace period
+ void $ tryIO $ signalProcessGroup sigKILL g
+
+{- Start or resume a transfer. -}
+startTransfer :: Transfer -> Assistant ()
+startTransfer t = do
+ m <- getCurrentTransfers
+ maybe startqueued go (M.lookup t m)
+ where
+ go info = maybe (start info) resume $ transferTid info
+ startqueued = do
+ is <- map snd <$> getMatchingTransfers (== t)
+ maybe noop start $ headMaybe is
+ resume tid = do
+ alterTransferInfo t $ \i -> i { transferPaused = False }
+ liftIO $ throwTo tid ResumeTransfer
+ start info = do
+ program <- liftIO readProgramFile
+ inImmediateTransferSlot program $
+ genTransfer t info
+
+getCurrentTransfers :: Assistant TransferMap
+getCurrentTransfers = currentTransfers <$> getDaemonStatus