aboutsummaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/BranchChange.hs9
-rw-r--r--Assistant/Pushes.hs13
-rw-r--r--Assistant/Sync.hs20
-rw-r--r--Assistant/Threads/MountWatcher.hs35
-rw-r--r--Assistant/Threads/NetWatcher.hs27
-rw-r--r--Assistant/Threads/PushNotifier.hs128
-rw-r--r--Assistant/Threads/Pusher.hs12
7 files changed, 196 insertions, 48 deletions
diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs
index b166c8777..d1d1c20df 100644
--- a/Assistant/BranchChange.hs
+++ b/Assistant/BranchChange.hs
@@ -8,14 +8,15 @@
module Assistant.BranchChange where
import Control.Concurrent.MSampleVar
+import Assistant.Common
-type BranchChangeHandle = MSampleVar ()
+newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ())
newBranchChangeHandle :: IO BranchChangeHandle
-newBranchChangeHandle = newEmptySV
+newBranchChangeHandle = BranchChangeHandle <$> newEmptySV
branchChanged :: BranchChangeHandle -> IO ()
-branchChanged = flip writeSV ()
+branchChanged (BranchChangeHandle h) = writeSV h ()
waitBranchChange :: BranchChangeHandle -> IO ()
-waitBranchChange = readSV
+waitBranchChange (BranchChangeHandle h) = readSV h
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
index f411dda07..7842c1884 100644
--- a/Assistant/Pushes.hs
+++ b/Assistant/Pushes.hs
@@ -8,6 +8,7 @@
module Assistant.Pushes where
import Common.Annex
+import Utility.TSet
import Control.Concurrent.STM
import Data.Time.Clock
@@ -17,6 +18,9 @@ import qualified Data.Map as M
type PushMap = M.Map Remote UTCTime
type FailedPushMap = TMVar PushMap
+{- Used to notify about successful pushes. -}
+newtype PushNotifier = PushNotifier (TSet UUID)
+
{- The TMVar starts empty, and is left empty when there are no
- failed pushes. This way we can block until there are some failed pushes.
-}
@@ -44,3 +48,12 @@ changeFailedPushMap v a = atomically $
store m
| m == M.empty = noop
| otherwise = putTMVar v $! m
+
+newPushNotifier :: IO PushNotifier
+newPushNotifier = PushNotifier <$> newTSet
+
+notifyPush :: [UUID] -> PushNotifier -> IO ()
+notifyPush us (PushNotifier s) = putTSet s us
+
+waitPush :: PushNotifier -> IO [UUID]
+waitPush (PushNotifier s) = getTSet s
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index 6c167e2ea..f9a513d94 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -36,9 +36,9 @@ import Control.Concurrent
- the remotes have diverged from the local git-annex branch. Otherwise,
- it's sufficient to requeue failed transfers.
-}
-reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO ()
-reconnectRemotes _ _ _ _ [] = noop
-reconnectRemotes threadname st dstatus scanremotes rs = void $
+reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO ()
+reconnectRemotes _ _ _ _ _ [] = noop
+reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $
alertWhile dstatus (syncAlert rs) $ do
(ok, diverged) <- sync
=<< runThreadState st (inRepo Git.Branch.current)
@@ -50,7 +50,7 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
sync (Just branch) = do
diverged <- manualPull st (Just branch) gitremotes
now <- getCurrentTime
- ok <- pushToRemotes threadname now st Nothing gitremotes
+ ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes
return (ok, diverged)
{- No local branch exists yet, but we can try pulling. -}
sync Nothing = do
@@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
- them. While ugly, those branches are reserved for pushing by us, and
- so our pushes will succeed.
-}
-pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool
-pushToRemotes threadname now st mpushmap remotes = do
+pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool
+pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
(g, branch, u) <- runThreadState st $ (,,)
<$> gitRepo
<*> inRepo Git.Branch.current
@@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do
updatemap succeeded []
let ok = null failed
if ok
- then return ok
+ then do
+ maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier
+ return ok
else if shouldretry
then retry branch g u failed
else fallback branch g u failed
@@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do
]
(succeeded, failed) <- inParallel (pushfallback g u branch) rs
updatemap succeeded failed
+ unless (null succeeded) $
+ maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier
return $ null failed
push g branch remote = Command.Sync.pushBranch remote branch g
@@ -157,4 +161,4 @@ manualPull st currentbranch remotes = do
syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO ()
syncNewRemote st dstatus scanremotes remote = do
runThreadState st $ updateSyncRemotes dstatus
- void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote]
+ void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote]
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
index 462f5843c..c36b544a7 100644
--- a/Assistant/Threads/MountWatcher.hs
+++ b/Assistant/Threads/MountWatcher.hs
@@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Sync
+import Assistant.Pushes
import qualified Annex
import qualified Git
import Utility.ThreadScheduler
@@ -38,20 +39,20 @@ import qualified Control.Exception as E
thisThread :: ThreadName
thisThread = "MountWatcher"
-mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
-mountWatcherThread st handle scanremotes = thread $
+mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
+mountWatcherThread st handle scanremotes pushnotifier = thread $
#if WITH_DBUS
- dbusThread st handle scanremotes
+ dbusThread st handle scanremotes pushnotifier
#else
- pollingThread st handle scanremotes
+ pollingThread st handle scanremotes pushnotifier
#endif
where
thread = NamedThread thisThread
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSession) onerr
where
go client = ifM (checkMountMonitor client)
( do
@@ -64,7 +65,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
listen client matcher $ \_event -> do
nowmounted <- currentMountPoints
wasmounted <- swapMVar mvar nowmounted
- handleMounts st dstatus scanremotes wasmounted nowmounted
+ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted
, do
runThreadState st $
warning "No known volume monitor available through dbus; falling back to mtab polling"
@@ -75,7 +76,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
runThreadState st $
warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
pollinstead
- pollinstead = pollingThread st dstatus scanremotes
+ pollinstead = pollingThread st dstatus scanremotes pushnotifier
{- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor mounts. If not, will attempt to start one. -}
@@ -137,24 +138,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback]
#endif
-pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-pollingThread st dstatus scanremotes = go =<< currentMountPoints
+pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints
where
go wasmounted = do
threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints
- handleMounts st dstatus scanremotes wasmounted nowmounted
+ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted
go nowmounted
-handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
-handleMounts st dstatus scanremotes wasmounted nowmounted =
- mapM_ (handleMount st dstatus scanremotes . mnt_dir) $
+handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO ()
+handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted =
+ mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $
S.toList $ newMountPoints wasmounted nowmounted
-handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO ()
-handleMount st dstatus scanremotes dir = do
+handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO ()
+handleMount st dstatus scanremotes pushnotifier dir = do
debug thisThread ["detected mount of", dir]
- reconnectRemotes thisThread st dstatus scanremotes
+ reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< filter (Git.repoIsLocal . Remote.repo)
<$> remotesUnder st dstatus dir
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
index a8daa9435..2c637f414 100644
--- a/Assistant/Threads/NetWatcher.hs
+++ b/Assistant/Threads/NetWatcher.hs
@@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Sync
+import Assistant.Pushes
import Utility.ThreadScheduler
import Remote.List
import qualified Types.Remote as Remote
@@ -32,12 +33,12 @@ import qualified Control.Exception as E
thisThread :: ThreadName
thisThread = "NetWatcher"
-netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
+netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
#if WITH_DBUS
-netWatcherThread st dstatus scanremotes = thread $
- dbusThread st dstatus scanremotes
+netWatcherThread st dstatus scanremotes pushnotifier = thread $
+ dbusThread st dstatus scanremotes pushnotifier
#else
-netWatcherThread _ _ _ = thread noop
+netWatcherThread _ _ _ _ = thread noop
#endif
where
thread = NamedThread thisThread
@@ -47,17 +48,17 @@ netWatcherThread _ _ _ = thread noop
- any networked remotes that may have not been routable for a
- while (despite the local network staying up), are synced with
- periodically. -}
-netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
-netWatcherFallbackThread st dstatus scanremotes = thread $
+netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
+netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $
runEvery (Seconds 3600) $
- handleConnection st dstatus scanremotes
+ handleConnection st dstatus scanremotes pushnotifier
where
thread = NamedThread thisThread
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSystem) onerr
where
go client = ifM (checkNetMonitor client)
( do
@@ -72,7 +73,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr
warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")"
handle = do
debug thisThread ["detected network connection"]
- handleConnection st dstatus scanremotes
+ handleConnection st dstatus scanremotes pushnotifier
{- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor network connections. -}
@@ -126,9 +127,9 @@ listenWicdConnections client callback =
#endif
-handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-handleConnection st dstatus scanremotes =
- reconnectRemotes thisThread st dstatus scanremotes
+handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+handleConnection st dstatus scanremotes pushnotifier =
+ reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< networkRemotes st
{- Finds network remotes. -}
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
new file mode 100644
index 000000000..12cbb3206
--- /dev/null
+++ b/Assistant/Threads/PushNotifier.hs
@@ -0,0 +1,128 @@
+{- git-annex assistant push notification thread, using XMPP
+ -
+ - This handles both sending outgoing push notifications, and receiving
+ - incoming push notifications.
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.PushNotifier where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Pushes
+import qualified Remote
+
+import Network.Protocol.XMPP
+import Network
+import Control.Concurrent
+import qualified Data.Text as T
+import qualified Data.Set as S
+import Utility.FileMode
+
+thisThread :: ThreadName
+thisThread = "PushNotifier"
+
+pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
+pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do
+ v <- runThreadState st $ getXMPPCreds
+ case v of
+ Nothing -> nocreds
+ Just c -> case parseJID (xmppUsername c) of
+ Nothing -> nocreds
+ Just jid -> void $ client c jid
+ where
+ nocreds = do
+ -- TODO alert
+ return () -- exit thread
+
+ client c jid = runClient server jid (xmppUsername c) (xmppPassword c) $ do
+ void $ bindJID jid
+ void $ putStanza $ emptyPresence PresenceUnavailable
+ s <- getSession
+ _ <- liftIO $ forkIO $ void $ sendnotifications s
+ receivenotifications
+ where
+ server = Server
+ (JID Nothing (jidDomain jid) Nothing)
+ (xmppHostname c)
+ (PortNumber $ fromIntegral $ xmppPort c)
+
+ sendnotifications session = runXMPP session $ forever $ do
+ us <- liftIO $ waitPush pushnotifier
+ {- Toggle presence to send the notification. -}
+ putStanza $ (emptyPresence PresenceAvailable)
+ { presenceID = Just $ encodePushNotification us }
+ putStanza $ emptyPresence PresenceUnavailable
+
+ receivenotifications = forever $ do
+ s <- getStanza
+ case s of
+ ReceivedPresence (Presence { presenceType = PresenceAvailable, presenceID = Just t }) ->
+ maybe noop (liftIO . pull dstatus)
+ (decodePushNotification t)
+ _ -> noop
+
+{- Everything we need to know to connect to an XMPP server. -}
+data XMPPCreds = XMPPCreds
+ { xmppUsername :: T.Text
+ , xmppPassword :: T.Text
+ , xmppHostname :: HostName
+ , xmppPort :: Int
+ }
+ deriving (Read, Show)
+
+getXMPPCreds :: Annex (Maybe XMPPCreds)
+getXMPPCreds = do
+ f <- xmppCredsFile
+ s <- liftIO $ catchMaybeIO $ readFile f
+ return $ readish =<< s
+
+setXMPPCreds :: XMPPCreds -> Annex ()
+setXMPPCreds creds = do
+ f <- xmppCredsFile
+ liftIO $ do
+ h <- openFile f WriteMode
+ modifyFileMode f $ removeModes
+ [groupReadMode, otherReadMode]
+ hPutStr h (show creds)
+ hClose h
+
+xmppCredsFile :: Annex FilePath
+xmppCredsFile = do
+ dir <- fromRepo gitAnnexCredsDir
+ return $ dir </> "notify-xmpp"
+
+{- A push notification is encoded in the id field of an XMPP presence
+ - notification, in the form: "git-annex-push:uuid[:uuid:...]
+ -
+ - Git repos can be pushed to that do not have a git-annex uuid; an empty
+ - string is used for those.
+ -}
+prefix :: T.Text
+prefix = T.pack "git-annex-push:"
+
+delim :: T.Text
+delim = T.pack ":"
+
+encodePushNotification :: [UUID] -> T.Text
+encodePushNotification us = T.concat
+ [ prefix
+ , T.intercalate delim $ map (T.pack . fromUUID) us
+ ]
+
+decodePushNotification :: T.Text -> Maybe [UUID]
+decodePushNotification t = map (toUUID . T.unpack) . T.splitOn delim
+ <$> T.stripPrefix prefix t
+
+pull :: DaemonStatusHandle -> [UUID] -> IO ()
+pull _ [] = noop
+pull dstatus us = do
+ rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
+ print ("TODO pull from", rs)
+ where
+ matching r = Remote.uuid r `S.member` s
+ s = S.fromList us
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 4f3a2dd09..295ceddc9 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -24,8 +24,8 @@ thisThread :: ThreadName
thisThread = "Pusher"
{- This thread retries pushes that failed before. -}
-pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread
-pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do
+pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
+pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
@@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do
]
now <- getCurrentTime
void $ alertWhile dstatus (pushRetryAlert topush) $
- pushToRemotes thisThread now st (Just pushmap) topush
+ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush
where
halfhour = 1800
thread = NamedThread thisThread
{- This thread pushes git commits out to remotes soon after they are made. -}
-pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread
-pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
+pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
@@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do
<$> getDaemonStatus dstatus
unless (null remotes) $
void $ alertWhile dstatus (pushAlert remotes) $
- pushToRemotes thisThread now st (Just pushmap) remotes
+ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do
debug thisThread
[ "delaying push of"