diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/BranchChange.hs | 9 | ||||
-rw-r--r-- | Assistant/Pushes.hs | 13 | ||||
-rw-r--r-- | Assistant/Sync.hs | 30 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 35 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 27 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 171 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 12 |
7 files changed, 244 insertions, 53 deletions
diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs index b166c8777..d1d1c20df 100644 --- a/Assistant/BranchChange.hs +++ b/Assistant/BranchChange.hs @@ -8,14 +8,15 @@ module Assistant.BranchChange where import Control.Concurrent.MSampleVar +import Assistant.Common -type BranchChangeHandle = MSampleVar () +newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ()) newBranchChangeHandle :: IO BranchChangeHandle -newBranchChangeHandle = newEmptySV +newBranchChangeHandle = BranchChangeHandle <$> newEmptySV branchChanged :: BranchChangeHandle -> IO () -branchChanged = flip writeSV () +branchChanged (BranchChangeHandle h) = writeSV h () waitBranchChange :: BranchChangeHandle -> IO () -waitBranchChange = readSV +waitBranchChange (BranchChangeHandle h) = readSV h diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index f411dda07..7842c1884 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -8,6 +8,7 @@ module Assistant.Pushes where import Common.Annex +import Utility.TSet import Control.Concurrent.STM import Data.Time.Clock @@ -17,6 +18,9 @@ import qualified Data.Map as M type PushMap = M.Map Remote UTCTime type FailedPushMap = TMVar PushMap +{- Used to notify about successful pushes. -} +newtype PushNotifier = PushNotifier (TSet UUID) + {- The TMVar starts empty, and is left empty when there are no - failed pushes. This way we can block until there are some failed pushes. -} @@ -44,3 +48,12 @@ changeFailedPushMap v a = atomically $ store m | m == M.empty = noop | otherwise = putTMVar v $! m + +newPushNotifier :: IO PushNotifier +newPushNotifier = PushNotifier <$> newTSet + +notifyPush :: [UUID] -> PushNotifier -> IO () +notifyPush us (PushNotifier s) = putTSet s us + +waitPush :: PushNotifier -> IO [UUID] +waitPush (PushNotifier s) = getTSet s diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 6c167e2ea..e332d7856 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -36,9 +36,9 @@ import Control.Concurrent - the remotes have diverged from the local git-annex branch. Otherwise, - it's sufficient to requeue failed transfers. -} -reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () -reconnectRemotes _ _ _ _ [] = noop -reconnectRemotes threadname st dstatus scanremotes rs = void $ +reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO () +reconnectRemotes _ _ _ _ _ [] = noop +reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $ alertWhile dstatus (syncAlert rs) $ do (ok, diverged) <- sync =<< runThreadState st (inRepo Git.Branch.current) @@ -48,13 +48,13 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ (gitremotes, _specialremotes) = partition (Git.repoIsUrl . Remote.repo) rs sync (Just branch) = do - diverged <- manualPull st (Just branch) gitremotes + diverged <- snd <$> manualPull st (Just branch) gitremotes now <- getCurrentTime - ok <- pushToRemotes threadname now st Nothing gitremotes + ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes return (ok, diverged) {- No local branch exists yet, but we can try pulling. -} sync Nothing = do - diverged <- manualPull st Nothing gitremotes + diverged <- snd <$> manualPull st Nothing gitremotes return (True, diverged) {- Updates the local sync branch, then pushes it to all remotes, in @@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ - them. While ugly, those branches are reserved for pushing by us, and - so our pushes will succeed. -} -pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool -pushToRemotes threadname now st mpushmap remotes = do +pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool +pushToRemotes threadname now st mpushnotifier mpushmap remotes = do (g, branch, u) <- runThreadState st $ (,,) <$> gitRepo <*> inRepo Git.Branch.current @@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do updatemap succeeded [] let ok = null failed if ok - then return ok + then do + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier + return ok else if shouldretry then retry branch g u failed else fallback branch g u failed @@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do ] (succeeded, failed) <- inParallel (pushfallback g u branch) rs updatemap succeeded failed + unless (null succeeded) $ + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g @@ -143,18 +147,18 @@ pushToRemotes threadname now st mpushmap remotes = do where s = show $ Git.Ref.base b {- Manually pull from remotes and merge their branches. -} -manualPull :: ThreadState -> Maybe Git.Ref -> [Remote] -> IO Bool +manualPull :: ThreadState -> Maybe Git.Ref -> [Remote] -> IO ([Bool], Bool) manualPull st currentbranch remotes = do g <- runThreadState st gitRepo - forM_ remotes $ \r -> + results <- forM remotes $ \r -> Git.Command.runBool "fetch" [Param $ Remote.name r] g haddiverged <- runThreadState st Annex.Branch.forceUpdate forM_ remotes $ \r -> runThreadState st $ Command.Sync.mergeRemote r currentbranch - return haddiverged + return (results, haddiverged) {- Start syncing a newly added remote, using a background thread. -} syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO () syncNewRemote st dstatus scanremotes remote = do runThreadState st $ updateSyncRemotes dstatus - void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote] + void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote] diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index 462f5843c..c36b544a7 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -38,20 +39,20 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -mountWatcherThread st handle scanremotes = thread $ +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +mountWatcherThread st handle scanremotes pushnotifier = thread $ #if WITH_DBUS - dbusThread st handle scanremotes + dbusThread st handle scanremotes pushnotifier #else - pollingThread st handle scanremotes + pollingThread st handle scanremotes pushnotifier #endif where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSession) onerr where go client = ifM (checkMountMonitor client) ( do @@ -64,7 +65,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr listen client matcher $ \_event -> do nowmounted <- currentMountPoints wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted , do runThreadState st $ warning "No known volume monitor available through dbus; falling back to mtab polling" @@ -75,7 +76,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr runThreadState st $ warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")" pollinstead - pollinstead = pollingThread st dstatus scanremotes + pollinstead = pollingThread st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} @@ -137,24 +138,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -pollingThread st dstatus scanremotes = go =<< currentMountPoints +pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints where go wasmounted = do threadDelaySeconds (Seconds 10) nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes . mnt_dir) $ +handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () +handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = + mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO () -handleMount st dstatus scanremotes dir = do +handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () +handleMount st dstatus scanremotes pushnotifier dir = do debug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index a8daa9435..2c637f414 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote @@ -32,12 +33,12 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "NetWatcher" -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes = thread $ - dbusThread st dstatus scanremotes +netWatcherThread st dstatus scanremotes pushnotifier = thread $ + dbusThread st dstatus scanremotes pushnotifier #else -netWatcherThread _ _ _ = thread noop +netWatcherThread _ _ _ _ = thread noop #endif where thread = NamedThread thisThread @@ -47,17 +48,17 @@ netWatcherThread _ _ _ = thread noop - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -netWatcherFallbackThread st dstatus scanremotes = thread $ +netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSystem) onerr where go client = ifM (checkNetMonitor client) ( do @@ -72,7 +73,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")" handle = do debug thisThread ["detected network connection"] - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor network connections. -} @@ -126,9 +127,9 @@ listenWicdConnections client callback = #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -handleConnection st dstatus scanremotes = - reconnectRemotes thisThread st dstatus scanremotes +handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +handleConnection st dstatus scanremotes pushnotifier = + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< networkRemotes st {- Finds network remotes. -} diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs new file mode 100644 index 000000000..84fe4952a --- /dev/null +++ b/Assistant/Threads/PushNotifier.hs @@ -0,0 +1,171 @@ +{- git-annex assistant push notification thread, using XMPP + - + - This handles both sending outgoing push notifications, and receiving + - incoming push notifications. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.PushNotifier where + +import Assistant.Common +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Pushes +import Assistant.Sync +import qualified Remote + +import Network.Protocol.XMPP +import Network +import Control.Concurrent +import qualified Data.Text as T +import qualified Data.Set as S +import Utility.FileMode +import qualified Git.Branch +import Data.XML.Types + +thisThread :: ThreadName +thisThread = "PushNotifier" + +pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread +pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do + v <- runThreadState st $ getXMPPCreds + case v of + Nothing -> nocreds + Just c -> case parseJID (xmppJID c) of + Nothing -> nocreds + Just jid -> void $ client c jid + where + nocreds = do + error "no creds" -- TODO alert + return () -- exit thread + + client c jid = runClient server jid (xmppUsername c) (xmppPassword c) $ do + void $ bindJID jid + s <- getSession + _ <- liftIO $ forkIO $ void $ runXMPP s $ + receivenotifications + sendnotifications + where + server = Server + (JID Nothing (jidDomain jid) Nothing) + (xmppHostname c) + (PortNumber $ fromIntegral $ xmppPort c) + + sendnotifications = forever $ do + us <- liftIO $ waitPush pushnotifier + let payload = [extendedAway, encodePushNotification us] + let notification = (emptyPresence PresenceAvailable) + { presencePayloads = payload } + putStanza notification + + receivenotifications = forever $ do + s <- getStanza + liftIO $ debug thisThread ["received XMPP:", show s] + case s of + ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> + liftIO $ pull st dstatus $ + concat $ catMaybes $ + map decodePushNotification $ + presencePayloads p + _ -> noop + +{- Everything we need to know to connect to an XMPP server. -} +data XMPPCreds = XMPPCreds + { xmppUsername :: T.Text + , xmppPassword :: T.Text + , xmppHostname :: HostName + , xmppPort :: Int + {- Something like username@hostname, but not necessarily the same + - username or hostname used to connect to the server. -} + , xmppJID :: T.Text + } + deriving (Read, Show) + +getXMPPCreds :: Annex (Maybe XMPPCreds) +getXMPPCreds = do + f <- xmppCredsFile + s <- liftIO $ catchMaybeIO $ readFile f + return $ readish =<< s + +setXMPPCreds :: XMPPCreds -> Annex () +setXMPPCreds creds = do + f <- xmppCredsFile + liftIO $ do + h <- openFile f WriteMode + modifyFileMode f $ removeModes + [groupReadMode, otherReadMode] + hPutStr h (show creds) + hClose h + +xmppCredsFile :: Annex FilePath +xmppCredsFile = do + dir <- fromRepo gitAnnexCredsDir + return $ dir </> "notify-xmpp" + +{- Marks the client as extended away. -} +extendedAway :: Element +extendedAway = Element (Name (T.pack "show") Nothing Nothing) [] + [NodeContent $ ContentText $ T.pack "xa"] + +{- Name of a git-annex tag, in our own XML namespace. + - (Not using a namespace URL to avoid unnecessary bloat.) -} +gitAnnexTagName :: Name +gitAnnexTagName = Name (T.pack "git-annex") (Just $ T.pack "git-annex") Nothing + +pushAttr :: Name +pushAttr = Name (T.pack "push") Nothing Nothing + +uuidSep :: T.Text +uuidSep = T.pack "," + +{- git-annex tag with one push attribute per UUID pushed to. -} +encodePushNotification :: [UUID] -> Element +encodePushNotification us = Element gitAnnexTagName + [(pushAttr, [ContentText pushvalue])] [] + where + pushvalue = T.intercalate uuidSep $ + map (T.pack . fromUUID) us + +decodePushNotification :: Element -> Maybe [UUID] +decodePushNotification (Element name attrs _nodes) + | name == gitAnnexTagName && not (null us) = Just us + | otherwise = Nothing + where + us = map (toUUID . T.unpack) $ + concatMap (T.splitOn uuidSep . T.concat . map fromContent . snd) $ + filter ispush attrs + ispush (k, _) = k == pushAttr + fromContent (ContentText t) = t + fromContent (ContentEntity t) = t + +{- We only pull from one remote out of the set listed in the push + - notification, as an optimisation. + - + - Note that it might be possible (though very unlikely) for the push + - notification to take a while to be sent, and multiple pushes happen + - before it is sent, so it includes multiple remotes that were pushed + - to at different times. + - + - It could then be the case that the remote we choose had the earlier + - push sent to it, but then failed to get the later push, and so is not + - fully up-to-date. If that happens, the pushRetryThread will come along + - and retry the push, and we'll get another notification once it succeeds, + - and pull again. -} +pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () +pull _ _ [] = noop +pull st dstatus us = do + rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus + debug thisThread $ "push notification for" : + map (fromUUID . Remote.uuid ) rs + pullone rs =<< runThreadState st (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us + + pullone [] _ = noop + pullone (r:rs) branch = + unlessM (all id . fst <$> manualPull st branch [r]) $ + pullone rs branch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 4f3a2dd09..295ceddc9 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,8 +24,8 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread -pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread +pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) @@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do ] now <- getCurrentTime void $ alertWhile dstatus (pushRetryAlert topush) $ - pushToRemotes thisThread now st (Just pushmap) topush + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread -pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread +pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do <$> getDaemonStatus dstatus unless (null remotes) $ void $ alertWhile dstatus (pushAlert remotes) $ - pushToRemotes thisThread now st (Just pushmap) remotes + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do debug thisThread [ "delaying push of" |