aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-10-26 16:54:49 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-10-26 16:58:16 -0400
commit35f4b77d7c9a0d413a5dabcd8ce15ad6388d582f (patch)
tree9c5fea8ee1c9d8455cf02f0a8090a7340ca5592c
parent96efe6a666c644c55c9eb503b4286565ac7d6748 (diff)
moved code out of webapp
No code changes, aside from some changes to lifting in code that turned out to be able to run in Assistant rather than Handler.
-rw-r--r--Assistant/Sync.hs41
-rw-r--r--Assistant/Threads/TransferWatcher.hs29
-rw-r--r--Assistant/Threads/Transferrer.hs102
-rw-r--r--Assistant/Threads/Watcher.hs8
-rw-r--r--Assistant/TransferSlots.hs199
5 files changed, 245 insertions, 134 deletions
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index 43f0309fe..6a66802d5 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -23,9 +23,17 @@ import qualified Git.Command
import qualified Git.Ref
import qualified Remote
import qualified Types.Remote as Remote
+import qualified Remote.List as Remote
import qualified Annex.Branch
import Annex.UUID
import Annex.TaggedPush
+import qualified Config
+import Git.Config
+import Assistant.NamedThread
+import Assistant.Threads.Watcher (watchThread, WatcherControl(..))
+import Assistant.TransferSlots
+import Assistant.TransferQueue
+import Logs.Transfer
import Data.Time.Clock
import qualified Data.Map as M
@@ -233,3 +241,36 @@ syncRemote remote = do
reconnectRemotes False [remote]
addScanRemotes True [remote]
void $ liftIO $ forkIO $ thread
+
+{- Use Nothing to change autocommit setting; or a remote to change
+ - its sync setting. -}
+changeSyncable :: Maybe Remote -> Bool -> Assistant ()
+changeSyncable Nothing enable = do
+ liftAnnex $ Config.setConfig key (boolConfig enable)
+ liftIO . maybe noop (`throwTo` signal)
+ =<< namedThreadId watchThread
+ where
+ key = Config.annexConfig "autocommit"
+ signal
+ | enable = ResumeWatcher
+ | otherwise = PauseWatcher
+changeSyncable (Just r) True = do
+ liftAnnex $ changeSyncFlag r True
+ syncRemote r
+changeSyncable (Just r) False = do
+ liftAnnex $ changeSyncFlag r False
+ updateSyncRemotes
+ {- Stop all transfers to or from this remote.
+ - XXX Can't stop any ongoing scan, or git syncs. -}
+ void $ dequeueTransfers tofrom
+ mapM_ (cancelTransfer False) =<<
+ filter tofrom . M.keys . currentTransfers <$> getDaemonStatus
+ where
+ tofrom t = transferUUID t == Remote.uuid r
+
+changeSyncFlag :: Remote -> Bool -> Annex ()
+changeSyncFlag r enabled = do
+ Config.setConfig key (boolConfig enabled)
+ void Remote.remoteListRefresh
+ where
+ key = Config.remoteConfig (Remote.repo r) "sync"
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
index 9bc851d4e..fc09373e7 100644
--- a/Assistant/Threads/TransferWatcher.hs
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -9,9 +9,7 @@ module Assistant.Threads.TransferWatcher where
import Assistant.Common
import Assistant.DaemonStatus
-import Assistant.TransferQueue
-import Assistant.Drop
-import Annex.Content
+import Assistant.TransferSlots
import Logs.Transfer
import Utility.DirWatcher
import Utility.DirWatcher.Types
@@ -98,28 +96,3 @@ onDel file = case parseTransferFile file of
- runs. -}
threadDelay 10000000 -- 10 seconds
finished t minfo
-
-{- Queue uploads of files downloaded to us, spreading them
- - out to other reachable remotes.
- -
- - Downloading a file may have caused a remote to not want it;
- - so check for drops from remotes.
- -
- - Uploading a file may cause the local repo, or some other remote to not
- - want it; handle that too.
- -}
-finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
-finishedTransfer t (Just info)
- | transferDirection t == Download =
- whenM (liftAnnex $ inAnnex $ transferKey t) $ do
- dodrops False
- queueTransfersMatching (/= transferUUID t)
- "newly received object"
- Later (transferKey t) (associatedFile info) Upload
- | otherwise = dodrops True
- where
- dodrops fromhere = handleDrops
- ("drop wanted after " ++ describeTransfer t info)
- fromhere (transferKey t) (associatedFile info) Nothing
-finishedTransfer _ _ = noop
-
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 98f8b6ad7..82f3f3e10 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -36,105 +36,3 @@ transfererThread = namedThread "Transferrer" $ do
where
{- Skip transfers that are already running. -}
notrunning = isNothing . startedTime
-
-{- By the time this is called, the daemonstatus's currentTransfers map should
- - already have been updated to include the transfer. -}
-genTransfer :: Transfer -> TransferInfo -> TransferGenerator
-genTransfer t info = case (transferRemote info, associatedFile info) of
- (Just remote, Just file)
- | Git.repoIsLocalUnknown (Remote.repo remote) -> do
- -- optimisation for removable drives not plugged in
- liftAnnex $ recordFailedTransfer t info
- void $ removeTransfer t
- return Nothing
- | otherwise -> ifM (liftAnnex $ shouldTransfer t info)
- ( do
- debug [ "Transferring:" , describeTransfer t info ]
- notifyTransfer
- return $ Just (t, info, go remote file)
- , do
- debug [ "Skipping unnecessary transfer:",
- describeTransfer t info ]
- void $ removeTransfer t
- finishedTransfer t (Just info)
- return Nothing
- )
- _ -> return Nothing
- where
- direction = transferDirection t
- isdownload = direction == Download
-
- {- Alerts are only shown for successful transfers.
- - Transfers can temporarily fail for many reasons,
- - so there's no point in bothering the user about
- - those. The assistant should recover.
- -
- - After a successful upload, handle dropping it from
- - here, if desired. In this case, the remote it was
- - uploaded to is known to have it.
- -
- - Also, after a successful transfer, the location
- - log has changed. Indicate that a commit has been
- - made, in order to queue a push of the git-annex
- - branch out to remotes that did not participate
- - in the transfer.
- -
- - If the process failed, it could have crashed,
- - so remove the transfer from the list of current
- - transfers, just in case it didn't stop
- - in a way that lets the TransferWatcher do its
- - usual cleanup. However, first check if something else is
- - running the transfer, to avoid removing active transfers.
- -}
- go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
- ( do
- void $ addAlert $ makeAlertFiller True $
- transferFileAlert direction True file
- unless isdownload $
- handleDrops
- ("object uploaded to " ++ show remote)
- True (transferKey t)
- (associatedFile info)
- (Just remote)
- void recordCommit
- , whenM (liftAnnex $ isNothing <$> checkTransfer t) $
- void $ removeTransfer t
- )
-
-{- Called right before a transfer begins, this is a last chance to avoid
- - unnecessary transfers.
- -
- - For downloads, we obviously don't need to download if the already
- - have the object.
- -
- - Smilarly, for uploads, check if the remote is known to already have
- - the object.
- -
- - Also, uploads get queued to all remotes, in order of cost.
- - This may mean, for example, that an object is uploaded over the LAN
- - to a locally paired client, and once that upload is done, a more
- - expensive transfer remote no longer wants the object. (Since
- - all the clients have it already.) So do one last check if this is still
- - preferred content.
- -
- - We'll also do one last preferred content check for downloads. An
- - example of a case where this could be needed is if a download is queued
- - for a file that gets moved out of an archive directory -- but before
- - that download can happen, the file is put back in the archive.
- -}
-shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
-shouldTransfer t info
- | transferDirection t == Download =
- (not <$> inAnnex key) <&&> wantGet True file
- | transferDirection t == Upload = case transferRemote info of
- Nothing -> return False
- Just r -> notinremote r
- <&&> wantSend True file (Remote.uuid r)
- | otherwise = return False
- where
- key = transferKey t
- file = associatedFile info
-
- {- Trust the location log to check if the remote already has
- - the key. This avoids a roundtrip to the remote. -}
- notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index a44664639..3eedbe145 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -9,7 +9,7 @@
module Assistant.Threads.Watcher (
watchThread,
- WatcherException(..),
+ WatcherControl(..),
checkCanWatch,
needLsof,
onAddSymlink,
@@ -64,10 +64,10 @@ needLsof = error $ unlines
]
{- A special exception that can be thrown to pause or resume the watcher. -}
-data WatcherException = PauseWatcher | ResumeWatcher
+data WatcherControl = PauseWatcher | ResumeWatcher
deriving (Show, Eq, Typeable)
-instance E.Exception WatcherException
+instance E.Exception WatcherControl
watchThread :: NamedThread
watchThread = namedThread "Watcher" $
@@ -107,7 +107,7 @@ runWatcher = do
where
hook a = Just <$> asIO2 (runHandler a)
-waitFor :: WatcherException -> Assistant () -> Assistant ()
+waitFor :: WatcherControl -> Assistant () -> Assistant ()
waitFor sig next = do
r <- liftIO (E.try pause :: IO (Either E.SomeException ()))
case r of
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
index 81a778a0a..36d557c3d 100644
--- a/Assistant/TransferSlots.hs
+++ b/Assistant/TransferSlots.hs
@@ -13,11 +13,27 @@ import Assistant.Types.TransferSlots
import Assistant.DaemonStatus
import Assistant.TransferrerPool
import Assistant.Types.TransferrerPool
+import Assistant.Types.TransferQueue
+import Assistant.TransferQueue
+import Assistant.Alert
+import Assistant.Alert.Utility
+import Assistant.Commits
+import Assistant.Drop
import Logs.Transfer
+import Logs.Location
+import qualified Git
+import qualified Remote
+import qualified Types.Remote as Remote
+import Annex.Content
+import Annex.Wanted
+import Config.Files
+import qualified Data.Map as M
import qualified Control.Exception as E
import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
+import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL)
+import System.Posix.Process (getProcessGroupIDOf)
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
@@ -76,3 +92,186 @@ runTransferThread' program d run = go
_ -> done
done = runAssistant d $
flip MSemN.signal 1 <<~ transferSlots
+
+{- By the time this is called, the daemonstatus's currentTransfers map should
+ - already have been updated to include the transfer. -}
+genTransfer :: Transfer -> TransferInfo -> TransferGenerator
+genTransfer t info = case (transferRemote info, associatedFile info) of
+ (Just remote, Just file)
+ | Git.repoIsLocalUnknown (Remote.repo remote) -> do
+ -- optimisation for removable drives not plugged in
+ liftAnnex $ recordFailedTransfer t info
+ void $ removeTransfer t
+ return Nothing
+ | otherwise -> ifM (liftAnnex $ shouldTransfer t info)
+ ( do
+ debug [ "Transferring:" , describeTransfer t info ]
+ notifyTransfer
+ return $ Just (t, info, go remote file)
+ , do
+ debug [ "Skipping unnecessary transfer:",
+ describeTransfer t info ]
+ void $ removeTransfer t
+ finishedTransfer t (Just info)
+ return Nothing
+ )
+ _ -> return Nothing
+ where
+ direction = transferDirection t
+ isdownload = direction == Download
+
+ {- Alerts are only shown for successful transfers.
+ - Transfers can temporarily fail for many reasons,
+ - so there's no point in bothering the user about
+ - those. The assistant should recover.
+ -
+ - After a successful upload, handle dropping it from
+ - here, if desired. In this case, the remote it was
+ - uploaded to is known to have it.
+ -
+ - Also, after a successful transfer, the location
+ - log has changed. Indicate that a commit has been
+ - made, in order to queue a push of the git-annex
+ - branch out to remotes that did not participate
+ - in the transfer.
+ -
+ - If the process failed, it could have crashed,
+ - so remove the transfer from the list of current
+ - transfers, just in case it didn't stop
+ - in a way that lets the TransferWatcher do its
+ - usual cleanup. However, first check if something else is
+ - running the transfer, to avoid removing active transfers.
+ -}
+ go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
+ ( do
+ void $ addAlert $ makeAlertFiller True $
+ transferFileAlert direction True file
+ unless isdownload $
+ handleDrops
+ ("object uploaded to " ++ show remote)
+ True (transferKey t)
+ (associatedFile info)
+ (Just remote)
+ void recordCommit
+ , whenM (liftAnnex $ isNothing <$> checkTransfer t) $
+ void $ removeTransfer t
+ )
+
+{- Called right before a transfer begins, this is a last chance to avoid
+ - unnecessary transfers.
+ -
+ - For downloads, we obviously don't need to download if the already
+ - have the object.
+ -
+ - Smilarly, for uploads, check if the remote is known to already have
+ - the object.
+ -
+ - Also, uploads get queued to all remotes, in order of cost.
+ - This may mean, for example, that an object is uploaded over the LAN
+ - to a locally paired client, and once that upload is done, a more
+ - expensive transfer remote no longer wants the object. (Since
+ - all the clients have it already.) So do one last check if this is still
+ - preferred content.
+ -
+ - We'll also do one last preferred content check for downloads. An
+ - example of a case where this could be needed is if a download is queued
+ - for a file that gets moved out of an archive directory -- but before
+ - that download can happen, the file is put back in the archive.
+ -}
+shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
+shouldTransfer t info
+ | transferDirection t == Download =
+ (not <$> inAnnex key) <&&> wantGet True file
+ | transferDirection t == Upload = case transferRemote info of
+ Nothing -> return False
+ Just r -> notinremote r
+ <&&> wantSend True file (Remote.uuid r)
+ | otherwise = return False
+ where
+ key = transferKey t
+ file = associatedFile info
+
+ {- Trust the location log to check if the remote already has
+ - the key. This avoids a roundtrip to the remote. -}
+ notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
+
+{- Queue uploads of files downloaded to us, spreading them
+ - out to other reachable remotes.
+ -
+ - Downloading a file may have caused a remote to not want it;
+ - so check for drops from remotes.
+ -
+ - Uploading a file may cause the local repo, or some other remote to not
+ - want it; handle that too.
+ -}
+finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
+finishedTransfer t (Just info)
+ | transferDirection t == Download =
+ whenM (liftAnnex $ inAnnex $ transferKey t) $ do
+ dodrops False
+ queueTransfersMatching (/= transferUUID t)
+ "newly received object"
+ Later (transferKey t) (associatedFile info) Upload
+ | otherwise = dodrops True
+ where
+ dodrops fromhere = handleDrops
+ ("drop wanted after " ++ describeTransfer t info)
+ fromhere (transferKey t) (associatedFile info) Nothing
+finishedTransfer _ _ = noop
+
+{- Pause a running transfer. -}
+pauseTransfer :: Transfer -> Assistant ()
+pauseTransfer = cancelTransfer True
+
+{- Cancel a running transfer. -}
+cancelTransfer :: Bool -> Transfer -> Assistant ()
+cancelTransfer pause t = do
+ m <- getCurrentTransfers
+ unless pause $
+ {- remove queued transfer -}
+ void $ dequeueTransfers $ equivilantTransfer t
+ {- stop running transfer -}
+ maybe noop stop (M.lookup t m)
+ where
+ stop info = do
+ {- When there's a thread associated with the
+ - transfer, it's signaled first, to avoid it
+ - displaying any alert about the transfer having
+ - failed when the transfer process is killed. -}
+ liftIO $ maybe noop signalthread $ transferTid info
+ liftIO $ maybe noop killproc $ transferPid info
+ if pause
+ then void $ alterTransferInfo t $
+ \i -> i { transferPaused = True }
+ else void $ removeTransfer t
+ signalthread tid
+ | pause = throwTo tid PauseTransfer
+ | otherwise = killThread tid
+ {- In order to stop helper processes like rsync,
+ - kill the whole process group of the process running the transfer. -}
+ killproc pid = void $ tryIO $ do
+ g <- getProcessGroupIDOf pid
+ void $ tryIO $ signalProcessGroup sigTERM g
+ threadDelay 50000 -- 0.05 second grace period
+ void $ tryIO $ signalProcessGroup sigKILL g
+
+{- Start or resume a transfer. -}
+startTransfer :: Transfer -> Assistant ()
+startTransfer t = do
+ m <- getCurrentTransfers
+ maybe startqueued go (M.lookup t m)
+ where
+ go info = maybe (start info) resume $ transferTid info
+ startqueued = do
+ is <- map snd <$> getMatchingTransfers (== t)
+ maybe noop start $ headMaybe is
+ resume tid = do
+ alterTransferInfo t $ \i -> i { transferPaused = False }
+ liftIO $ throwTo tid ResumeTransfer
+ start info = do
+ program <- liftIO readProgramFile
+ inImmediateTransferSlot program $
+ genTransfer t info
+
+getCurrentTransfers :: Assistant TransferMap
+getCurrentTransfers = currentTransfers <$> getDaemonStatus