diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/BranchChange.hs | 9 | ||||
-rw-r--r-- | Assistant/Pushes.hs | 13 | ||||
-rw-r--r-- | Assistant/Sync.hs | 20 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 35 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 27 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 128 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 12 |
7 files changed, 196 insertions, 48 deletions
diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs index b166c8777..d1d1c20df 100644 --- a/Assistant/BranchChange.hs +++ b/Assistant/BranchChange.hs @@ -8,14 +8,15 @@ module Assistant.BranchChange where import Control.Concurrent.MSampleVar +import Assistant.Common -type BranchChangeHandle = MSampleVar () +newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ()) newBranchChangeHandle :: IO BranchChangeHandle -newBranchChangeHandle = newEmptySV +newBranchChangeHandle = BranchChangeHandle <$> newEmptySV branchChanged :: BranchChangeHandle -> IO () -branchChanged = flip writeSV () +branchChanged (BranchChangeHandle h) = writeSV h () waitBranchChange :: BranchChangeHandle -> IO () -waitBranchChange = readSV +waitBranchChange (BranchChangeHandle h) = readSV h diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index f411dda07..7842c1884 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -8,6 +8,7 @@ module Assistant.Pushes where import Common.Annex +import Utility.TSet import Control.Concurrent.STM import Data.Time.Clock @@ -17,6 +18,9 @@ import qualified Data.Map as M type PushMap = M.Map Remote UTCTime type FailedPushMap = TMVar PushMap +{- Used to notify about successful pushes. -} +newtype PushNotifier = PushNotifier (TSet UUID) + {- The TMVar starts empty, and is left empty when there are no - failed pushes. This way we can block until there are some failed pushes. -} @@ -44,3 +48,12 @@ changeFailedPushMap v a = atomically $ store m | m == M.empty = noop | otherwise = putTMVar v $! m + +newPushNotifier :: IO PushNotifier +newPushNotifier = PushNotifier <$> newTSet + +notifyPush :: [UUID] -> PushNotifier -> IO () +notifyPush us (PushNotifier s) = putTSet s us + +waitPush :: PushNotifier -> IO [UUID] +waitPush (PushNotifier s) = getTSet s diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 6c167e2ea..f9a513d94 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -36,9 +36,9 @@ import Control.Concurrent - the remotes have diverged from the local git-annex branch. Otherwise, - it's sufficient to requeue failed transfers. -} -reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () -reconnectRemotes _ _ _ _ [] = noop -reconnectRemotes threadname st dstatus scanremotes rs = void $ +reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO () +reconnectRemotes _ _ _ _ _ [] = noop +reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $ alertWhile dstatus (syncAlert rs) $ do (ok, diverged) <- sync =<< runThreadState st (inRepo Git.Branch.current) @@ -50,7 +50,7 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ sync (Just branch) = do diverged <- manualPull st (Just branch) gitremotes now <- getCurrentTime - ok <- pushToRemotes threadname now st Nothing gitremotes + ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes return (ok, diverged) {- No local branch exists yet, but we can try pulling. -} sync Nothing = do @@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ - them. While ugly, those branches are reserved for pushing by us, and - so our pushes will succeed. -} -pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool -pushToRemotes threadname now st mpushmap remotes = do +pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool +pushToRemotes threadname now st mpushnotifier mpushmap remotes = do (g, branch, u) <- runThreadState st $ (,,) <$> gitRepo <*> inRepo Git.Branch.current @@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do updatemap succeeded [] let ok = null failed if ok - then return ok + then do + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier + return ok else if shouldretry then retry branch g u failed else fallback branch g u failed @@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do ] (succeeded, failed) <- inParallel (pushfallback g u branch) rs updatemap succeeded failed + unless (null succeeded) $ + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g @@ -157,4 +161,4 @@ manualPull st currentbranch remotes = do syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO () syncNewRemote st dstatus scanremotes remote = do runThreadState st $ updateSyncRemotes dstatus - void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote] + void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote] diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index 462f5843c..c36b544a7 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -38,20 +39,20 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -mountWatcherThread st handle scanremotes = thread $ +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +mountWatcherThread st handle scanremotes pushnotifier = thread $ #if WITH_DBUS - dbusThread st handle scanremotes + dbusThread st handle scanremotes pushnotifier #else - pollingThread st handle scanremotes + pollingThread st handle scanremotes pushnotifier #endif where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSession) onerr where go client = ifM (checkMountMonitor client) ( do @@ -64,7 +65,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr listen client matcher $ \_event -> do nowmounted <- currentMountPoints wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted , do runThreadState st $ warning "No known volume monitor available through dbus; falling back to mtab polling" @@ -75,7 +76,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr runThreadState st $ warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")" pollinstead - pollinstead = pollingThread st dstatus scanremotes + pollinstead = pollingThread st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} @@ -137,24 +138,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -pollingThread st dstatus scanremotes = go =<< currentMountPoints +pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints where go wasmounted = do threadDelaySeconds (Seconds 10) nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes . mnt_dir) $ +handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () +handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = + mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO () -handleMount st dstatus scanremotes dir = do +handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () +handleMount st dstatus scanremotes pushnotifier dir = do debug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index a8daa9435..2c637f414 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote @@ -32,12 +33,12 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "NetWatcher" -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes = thread $ - dbusThread st dstatus scanremotes +netWatcherThread st dstatus scanremotes pushnotifier = thread $ + dbusThread st dstatus scanremotes pushnotifier #else -netWatcherThread _ _ _ = thread noop +netWatcherThread _ _ _ _ = thread noop #endif where thread = NamedThread thisThread @@ -47,17 +48,17 @@ netWatcherThread _ _ _ = thread noop - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -netWatcherFallbackThread st dstatus scanremotes = thread $ +netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSystem) onerr where go client = ifM (checkNetMonitor client) ( do @@ -72,7 +73,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")" handle = do debug thisThread ["detected network connection"] - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor network connections. -} @@ -126,9 +127,9 @@ listenWicdConnections client callback = #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -handleConnection st dstatus scanremotes = - reconnectRemotes thisThread st dstatus scanremotes +handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +handleConnection st dstatus scanremotes pushnotifier = + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< networkRemotes st {- Finds network remotes. -} diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs new file mode 100644 index 000000000..12cbb3206 --- /dev/null +++ b/Assistant/Threads/PushNotifier.hs @@ -0,0 +1,128 @@ +{- git-annex assistant push notification thread, using XMPP + - + - This handles both sending outgoing push notifications, and receiving + - incoming push notifications. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.PushNotifier where + +import Assistant.Common +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Pushes +import qualified Remote + +import Network.Protocol.XMPP +import Network +import Control.Concurrent +import qualified Data.Text as T +import qualified Data.Set as S +import Utility.FileMode + +thisThread :: ThreadName +thisThread = "PushNotifier" + +pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread +pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do + v <- runThreadState st $ getXMPPCreds + case v of + Nothing -> nocreds + Just c -> case parseJID (xmppUsername c) of + Nothing -> nocreds + Just jid -> void $ client c jid + where + nocreds = do + -- TODO alert + return () -- exit thread + + client c jid = runClient server jid (xmppUsername c) (xmppPassword c) $ do + void $ bindJID jid + void $ putStanza $ emptyPresence PresenceUnavailable + s <- getSession + _ <- liftIO $ forkIO $ void $ sendnotifications s + receivenotifications + where + server = Server + (JID Nothing (jidDomain jid) Nothing) + (xmppHostname c) + (PortNumber $ fromIntegral $ xmppPort c) + + sendnotifications session = runXMPP session $ forever $ do + us <- liftIO $ waitPush pushnotifier + {- Toggle presence to send the notification. -} + putStanza $ (emptyPresence PresenceAvailable) + { presenceID = Just $ encodePushNotification us } + putStanza $ emptyPresence PresenceUnavailable + + receivenotifications = forever $ do + s <- getStanza + case s of + ReceivedPresence (Presence { presenceType = PresenceAvailable, presenceID = Just t }) -> + maybe noop (liftIO . pull dstatus) + (decodePushNotification t) + _ -> noop + +{- Everything we need to know to connect to an XMPP server. -} +data XMPPCreds = XMPPCreds + { xmppUsername :: T.Text + , xmppPassword :: T.Text + , xmppHostname :: HostName + , xmppPort :: Int + } + deriving (Read, Show) + +getXMPPCreds :: Annex (Maybe XMPPCreds) +getXMPPCreds = do + f <- xmppCredsFile + s <- liftIO $ catchMaybeIO $ readFile f + return $ readish =<< s + +setXMPPCreds :: XMPPCreds -> Annex () +setXMPPCreds creds = do + f <- xmppCredsFile + liftIO $ do + h <- openFile f WriteMode + modifyFileMode f $ removeModes + [groupReadMode, otherReadMode] + hPutStr h (show creds) + hClose h + +xmppCredsFile :: Annex FilePath +xmppCredsFile = do + dir <- fromRepo gitAnnexCredsDir + return $ dir </> "notify-xmpp" + +{- A push notification is encoded in the id field of an XMPP presence + - notification, in the form: "git-annex-push:uuid[:uuid:...] + - + - Git repos can be pushed to that do not have a git-annex uuid; an empty + - string is used for those. + -} +prefix :: T.Text +prefix = T.pack "git-annex-push:" + +delim :: T.Text +delim = T.pack ":" + +encodePushNotification :: [UUID] -> T.Text +encodePushNotification us = T.concat + [ prefix + , T.intercalate delim $ map (T.pack . fromUUID) us + ] + +decodePushNotification :: T.Text -> Maybe [UUID] +decodePushNotification t = map (toUUID . T.unpack) . T.splitOn delim + <$> T.stripPrefix prefix t + +pull :: DaemonStatusHandle -> [UUID] -> IO () +pull _ [] = noop +pull dstatus us = do + rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus + print ("TODO pull from", rs) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 4f3a2dd09..295ceddc9 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,8 +24,8 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread -pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread +pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) @@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do ] now <- getCurrentTime void $ alertWhile dstatus (pushRetryAlert topush) $ - pushToRemotes thisThread now st (Just pushmap) topush + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread -pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread +pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do <$> getDaemonStatus dstatus unless (null remotes) $ void $ alertWhile dstatus (pushAlert remotes) $ - pushToRemotes thisThread now st (Just pushmap) remotes + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do debug thisThread [ "delaying push of" |