diff options
-rw-r--r-- | Assistant/NetMessager.hs | 134 | ||||
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 10 | ||||
-rw-r--r-- | Assistant/Types/NetMessager.hs | 25 | ||||
-rw-r--r-- | Assistant/XMPP/Git.hs | 146 | ||||
-rw-r--r-- | Utility/Directory.hs | 11 | ||||
-rw-r--r-- | debian/changelog | 3 | ||||
-rw-r--r-- | doc/design/assistant/more_cloud_providers.mdwn | 3 | ||||
-rw-r--r-- | doc/design/assistant/xmpp.mdwn | 3 |
8 files changed, 207 insertions, 128 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 97d17af6e..7ee4d464b 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -1,21 +1,23 @@ {- git-annex assistant out of band network messager interface - - - Copyright 2012 Joey Hess <joey@kitenet.net> + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> - - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE BangPatterns #-} + module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager -import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Control.Exception as E import qualified Data.Set as S import qualified Data.Map as M +import qualified Data.DList as D sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager - relating to this push, while any messages relating to other pushes - on the same side go to netMessagesDeferred. Once the push finishes, - those deferred messages will be fed to handledeferred for processing. + - + - If this is called when a push of the same side is running, it will + - block until that push completes, and then run. -} -runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a -runPush side clientid handledeferred a = do +runPush :: PushSide -> ClientID -> Assistant a -> Assistant a +runPush side clientid a = do nm <- getAssistant netMessager - let runningv = getSide side $ netMessagerPushRunning nm - let setup = void $ atomically $ swapTMVar runningv $ Just clientid - let cleanup = atomically $ do - void $ swapTMVar runningv Nothing - emptytchan (getSide side $ netMessagesPush nm) + let v = getSide side $ netMessagerPushRunning nm + debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side] + let setup = do + debugmsg "preparing to run" + atomically $ ifM (isNothing <$> tryReadTMVar v) + ( putTMVar v clientid + , retry + ) + debugmsg "started running" + let cleanup = do + debugmsg "finished running" + atomically $ takeTMVar v r <- E.bracket_ setup cleanup <~> a - (void . forkIO) <~> processdeferred nm + {- Empty the inbox, because stuff may have been left in it + - if the push failed. -} + emptyInbox clientid side return r - where - emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c - processdeferred nm = do - s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty - mapM_ rundeferred (S.toList s) - rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) - <~> handledeferred m - -{- While a push is running, matching push messages are put into - - netMessagesPush, while others that involve the same side go to - - netMessagesPushDeferred. + +{- Stores messages for a push into the appropriate inbox. - - - When no push is running involving the same side, returns False. + - To avoid overflow, only 1000 messages max are stored in any + - inbox, which should be far more than necessary. - - - To avoid bloating memory, only messages that initiate pushes are - - deferred. + - TODO: If we have more than 100 inboxes for different clients, + - discard old ones that are not currently being used by any push. -} -queueNetPushMessage :: NetMessage -> Assistant Bool -queueNetPushMessage m@(Pushing clientid stage) = do - nm <- getAssistant netMessager - liftIO $ atomically $ do - v <- readTMVar (getSide side $ netMessagerPushRunning nm) - case v of - Nothing -> return False - (Just runningclientid) - | runningclientid == clientid -> queue nm - | isPushInitiation stage -> defer nm - | otherwise -> discard +storeInbox :: NetMessage -> Assistant () +storeInbox msg@(Pushing clientid stage) = do + inboxes <- getInboxes side + stored <- liftIO $ atomically $ do + m <- readTVar inboxes + let update = \v -> do + writeTVar inboxes $ + M.insertWith' const clientid v m + return True + case M.lookup clientid m of + Nothing -> update (1, tostore) + Just (sz, l) + | sz > 1000 -> return False + | otherwise -> + let !sz' = sz + 1 + !l' = D.append l tostore + in update (sz', l') + if stored + then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"] + else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"] where side = pushDestinationSide stage - queue nm = do - writeTChan (getSide side $ netMessagesPush nm) m - return True - defer nm = do - let mv = getSide side $ netMessagesPushDeferred nm - s <- takeTMVar mv - putTMVar mv $ S.insert m s - return True - discard = return True -queueNetPushMessage _ = return False - -waitNetPushMessage :: PushSide -> Assistant (NetMessage) -waitNetPushMessage side = (atomically . readTChan) - <<~ (getSide side . netMessagesPush . netMessager) + tostore = D.singleton msg +storeInbox _ = noop + +{- Gets the new message for a push from its inbox. + - Blocks until a message has been received. -} +waitInbox :: ClientID -> PushSide -> Assistant (NetMessage) +waitInbox clientid side = do + inboxes <- getInboxes side + liftIO $ atomically $ do + m <- readTVar inboxes + case M.lookup clientid m of + Nothing -> retry + Just (sz, dl) + | sz < 1 -> retry + | otherwise -> do + let msg = D.head dl + let dl' = D.tail dl + let !sz' = sz - 1 + writeTVar inboxes $ + M.insertWith' const clientid (sz', dl') m + return msg + +emptyInbox :: ClientID -> PushSide -> Assistant () +emptyInbox clientid side = do + inboxes <- getInboxes side + liftIO $ atomically $ + modifyTVar' inboxes $ + M.delete clientid + +getInboxes :: PushSide -> Assistant Inboxes +getInboxes side = + getSide side . netMessagesInboxes <$> getAssistant netMessager +netMessagerDebug :: ClientID -> [String] -> Assistant () +netMessagerDebug clientid l = debug $ + "NetMessager" : l ++ [show $ logClientID clientid] diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 086494a74..929b4c807 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -107,11 +107,11 @@ xmppClient urlrenderer d creds = handle selfjid (GotNetMessage (PairingNotification stage c u)) = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) handle _ (GotNetMessage m@(Pushing _ pushstage)) - | isPushInitiation pushstage = inAssistant $ - unlessM (queueNetPushMessage m) $ do - let checker = checkCloudRepos urlrenderer - void $ forkIO <~> handlePushInitiation checker m - | otherwise = void $ inAssistant $ queueNetPushMessage m + | isPushNotice pushstage = inAssistant $ handlePushNotice m + | isPushInitiation pushstage = inAssistant $ do + let checker = checkCloudRepos urlrenderer + void $ forkIO <~> handlePushInitiation checker m + | otherwise = void $ inAssistant $ storeInbox m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 09a558033..2c9de253f 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -18,6 +18,7 @@ import Data.Text (Text) import qualified Data.Text as T import qualified Data.Set as S import qualified Data.Map as M +import qualified Data.DList as D {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -85,13 +86,16 @@ logClientID c = T.concat [T.take 1 c, T.pack $ show $ T.length c] {- Things that initiate either side of a push, but do not actually send data. -} isPushInitiation :: PushStage -> Bool -isPushInitiation (CanPush _) = True isPushInitiation (PushRequest _) = True isPushInitiation (StartingPush _) = True isPushInitiation _ = False +isPushNotice :: PushStage -> Bool +isPushNotice (CanPush _) = True +isPushNotice _ = False + data PushSide = SendPack | ReceivePack - deriving (Eq, Ord) + deriving (Eq, Ord, Show) pushDestinationSide :: PushStage -> PushSide pushDestinationSide (CanPush _) = ReceivePack @@ -114,6 +118,8 @@ mkSideMap gen = do getSide :: PushSide -> SideMap a -> a getSide side m = m side +type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage)) + data NetMessager = NetMessager -- outgoing messages { netMessages :: TChan NetMessage @@ -124,11 +130,11 @@ data NetMessager = NetMessager -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () -- only one side of a push can be running at a time - , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID)) - -- incoming messages related to a running push - , netMessagesPush :: SideMap (TChan NetMessage) - -- incoming push messages, deferred to be processed later - , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage)) + -- the TMVars are empty when nothing is running + , netMessagerPushRunning :: SideMap (TMVar ClientID) + -- incoming messages containing data for a push, + -- on a per-client and per-side basis + , netMessagesInboxes :: SideMap Inboxes } newNetMessager :: IO NetMessager @@ -137,6 +143,5 @@ newNetMessager = NetMessager <*> atomically (newTMVar M.empty) <*> atomically (newTMVar M.empty) <*> newEmptySV - <*> mkSideMap (newTMVar Nothing) - <*> mkSideMap newTChan - <*> mkSideMap (newTMVar S.empty) + <*> mkSideMap newEmptyTMVar + <*> mkSideMap (newTVar M.empty) diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index f087f6d1f..98c70cf41 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -43,6 +43,22 @@ import System.Timeout import qualified Data.ByteString as B import qualified Data.Map as M +{- Largest chunk of data to send in a single XMPP message. -} +chunkSize :: Int +chunkSize = 4096 + +{- How long to wait for an expected message before assuming the other side + - has gone away and canceling a push. + - + - This needs to be long enough to allow a message of up to 2+ times + - chunkSize to propigate up to a XMPP server, perhaps across to another + - server, and back down to us. On the other hand, other XMPP pushes can be + - delayed for running until the timeout is reached, so it should not be + - excessive. + -} +xmppTimeout :: Int +xmppTimeout = 120000000 -- 120 seconds + finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing jid u = void $ alertWhile alert $ makeXMPPGitRemote buddy (baseJID jid) u @@ -83,8 +99,8 @@ makeXMPPGitRemote buddyname jid u = do - - We listen at the other end of the pipe and relay to and from XMPP. -} -xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> (NetMessage -> Assistant ()) -> Assistant Bool -xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do +xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool +xmppPush cid gitpush = runPush SendPack cid $ do u <- liftAnnex getUUID sendNetMessage $ Pushing cid (StartingPush u) @@ -132,24 +148,25 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do sendNetMessage $ Pushing cid $ SendPackOutput seqnum' b toxmpp seqnum' inh - fromxmpp outh controlh = forever $ do - m <- timeout xmppTimeout <~> waitNetPushMessage SendPack - case m of - (Just (Pushing _ (ReceivePackOutput _ b))) -> - liftIO $ writeChunk outh b - (Just (Pushing _ (ReceivePackDone exitcode))) -> - liftIO $ do - hPrint controlh exitcode - hFlush controlh - (Just _) -> noop - Nothing -> do - debug ["timeout waiting for git receive-pack output via XMPP"] - -- Send a synthetic exit code to git-annex - -- xmppgit, which will exit and cause git push - -- to die. - liftIO $ do - hPrint controlh (ExitFailure 1) - hFlush controlh + + fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle + where + handle (Just (Pushing _ (ReceivePackOutput _ b))) = + liftIO $ writeChunk outh b + handle (Just (Pushing _ (ReceivePackDone exitcode))) = + liftIO $ do + hPrint controlh exitcode + hFlush controlh + handle (Just _) = noop + handle Nothing = do + debug ["timeout waiting for git receive-pack output via XMPP"] + -- Send a synthetic exit code to git-annex + -- xmppgit, which will exit and cause git push + -- to die. + liftIO $ do + hPrint controlh (ExitFailure 1) + hFlush controlh + installwrapper tmpdir = liftIO $ do createDirectoryIfMissing True tmpdir let wrapper = tmpdir </> "git-remote-xmpp" @@ -159,6 +176,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do , "exec " ++ program ++ " xmppgit" ] modifyFileMode wrapper $ addModes executeModes + {- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp - dir (ie, not on a crippled filesystem where we can't make - the wrapper executable). -} @@ -169,7 +187,6 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do tmp <- liftAnnex $ fromRepo gitAnnexTmpDir return $ tmp </> "xmppgit" Just d -> return $ d </> "xmppgit" - type EnvVar = String @@ -219,8 +236,8 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} -xmppReceivePack :: ClientID -> (NetMessage -> Assistant ()) -> Assistant Bool -xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do +xmppReceivePack :: ClientID -> Assistant Bool +xmppReceivePack cid = runPush ReceivePack cid $ do repodir <- liftAnnex $ fromRepo repoPath let p = (proc "git" ["receive-pack", repodir]) { std_in = CreatePipe @@ -245,19 +262,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do let seqnum' = succ seqnum sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b relaytoxmpp seqnum' outh - relayfromxmpp inh = forever $ do - m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack - case m of - (Just (Pushing _ (SendPackOutput _ b))) -> - liftIO $ writeChunk inh b - (Just _) -> noop - Nothing -> do - debug ["timeout waiting for git send-pack output via XMPP"] - -- closing the handle will make - -- git receive-pack exit - liftIO $ do - hClose inh - killThread =<< myThreadId + relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle + where + handle (Just (Pushing _ (SendPackOutput _ b))) = + liftIO $ writeChunk inh b + handle (Just _) = noop + handle Nothing = do + debug ["timeout waiting for git send-pack output via XMPP"] + -- closing the handle will make git receive-pack exit + liftIO $ do + hClose inh + killThread =<< myThreadId xmppRemotes :: ClientID -> UUID -> Assistant [Remote] xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of @@ -272,10 +287,6 @@ xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um handlePushInitiation :: (Remote -> Assistant ()) -> NetMessage -> Assistant () -handlePushInitiation _ (Pushing cid (CanPush theiruuid)) = - unlessM (null <$> xmppRemotes cid theiruuid) $ do - u <- liftAnnex getUUID - sendNetMessage $ Pushing cid (PushRequest u) handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) = go =<< liftAnnex (inRepo Git.Branch.current) where @@ -290,38 +301,55 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) = selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus forM_ rs $ \r -> do void $ alertWhile (syncAlert [r]) $ - xmppPush cid - (taggedPush u selfjid branch r) - (handleDeferred checkcloudrepos) + xmppPush cid (taggedPush u selfjid branch r) checkcloudrepos r handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do rs <- xmppRemotes cid theiruuid unless (null rs) $ do void $ alertWhile (syncAlert rs) $ - xmppReceivePack cid (handleDeferred checkcloudrepos) + xmppReceivePack cid mapM_ checkcloudrepos rs handlePushInitiation _ _ = noop -handleDeferred :: (Remote -> Assistant ()) -> NetMessage -> Assistant () -handleDeferred = handlePushInitiation +handlePushNotice :: NetMessage -> Assistant () +handlePushNotice (Pushing cid (CanPush theiruuid)) = + unlessM (null <$> xmppRemotes cid theiruuid) $ do + u <- liftAnnex getUUID + sendNetMessage $ Pushing cid (PushRequest u) +handlePushNotice _ = noop writeChunk :: Handle -> B.ByteString -> IO () writeChunk h b = do B.hPut h b hFlush h -{- Largest chunk of data to send in a single XMPP message. -} -chunkSize :: Int -chunkSize = 4096 - -{- How long to wait for an expected message before assuming the other side - - has gone away and canceling a push. +{- Gets NetMessages for a PushSide, ensures they are in order, + - and runs an action to handle each in turn. The action will be passed + - Nothing on timeout. - - - This needs to be long enough to allow a message of up to 2+ times - - chunkSize to propigate up to a XMPP server, perhaps across to another - - server, and back down to us. On the other hand, other XMPP pushes can be - - delayed for running until the timeout is reached, so it should not be - - excessive. + - Does not currently reorder messages, but does ensure that any + - duplicate messages, or messages not in the sequence, are discarded. -} -xmppTimeout :: Int -xmppTimeout = 120000000 -- 120 seconds +withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant () +withPushMessagesInSequence cid side a = loop 0 + where + loop seqnum = do + m <- timeout xmppTimeout <~> waitInbox cid side + let go s = a m >> loop s + case extractSequence =<< m of + Just seqnum' + | seqnum' == seqnum + 1 -> go seqnum' + | seqnum' == 0 -> go seqnum + | seqnum' == seqnum -> do + debug ["ignoring duplicate sequence number", show seqnum] + loop seqnum + | otherwise -> do + debug ["ignoring out of order sequence number", show seqnum', "expected", show seqnum] + loop seqnum + Nothing -> go seqnum + +extractSequence :: NetMessage -> Maybe Int +extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum +extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum +extractSequence _ = Nothing + diff --git a/Utility/Directory.hs b/Utility/Directory.hs index 9477ad5b9..0a7690b44 100644 --- a/Utility/Directory.hs +++ b/Utility/Directory.hs @@ -5,6 +5,8 @@ - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE CPP #-} + module Utility.Directory where import System.IO.Error @@ -85,9 +87,14 @@ moveFile src dest = tryIO (rename src dest) >>= onrename (Left _) -> return False (Right s) -> return $ isDirectory s -{- Removes a file, which may or may not exist. +{- Removes a file, which may or may not exist, and does not have to + - be a regular file. - - Note that an exception is thrown if the file exists but - cannot be removed. -} nukeFile :: FilePath -> IO () -nukeFile file = whenM (doesFileExist file) $ removeFile file +#ifndef mingw32_HOST_OS +nukeFile = removeLink +#else +nukeFile = removeFile +#endif diff --git a/debian/changelog b/debian/changelog index 30b61700b..7b8476727 100644 --- a/debian/changelog +++ b/debian/changelog @@ -22,6 +22,9 @@ git-annex (4.20130517) UNRELEASED; urgency=low * OSX: Fixed gpg included in dmg. * Linux standalone: Back to being built with glibc 2.13 for maximum portability. + * XMPP: Ignore duplicate messages received when pushing. + * XMPP: Be better at responding to CanPush messages when busy with + something else. -- Joey Hess <joeyh@debian.org> Fri, 17 May 2013 11:17:03 -0400 diff --git a/doc/design/assistant/more_cloud_providers.mdwn b/doc/design/assistant/more_cloud_providers.mdwn index 7949f8a7e..52372e06d 100644 --- a/doc/design/assistant/more_cloud_providers.mdwn +++ b/doc/design/assistant/more_cloud_providers.mdwn @@ -15,6 +15,7 @@ More should be added, such as: REST API; free software * Mediafire provides 50gb free and has a REST API. * Flickr provides 1 tb (!!!!) to free accounts, and can store at least - photos and videos. + photos and videos. <https://github.com/ricardobeat/filr> is a hack + to allow storing any type of file on Flickr. See poll at [[polls/prioritizing_special_remotes]]. diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn index f37ff9c46..5ee8bc508 100644 --- a/doc/design/assistant/xmpp.mdwn +++ b/doc/design/assistant/xmpp.mdwn @@ -96,8 +96,7 @@ one or more chat messages, directed to the receiver: The value of rp and sp used to be empty, but now it's a sequence number. This indicates the sequence of this packet, counting from 1. The receiver -and sender each have their own sequence numbers. These sequence numbers -are not really used yet, but are available for debugging. +and sender each have their own sequence numbers. When `git receive-pack` exits, the receiver indicates its exit status with a chat message, directed at the sender: |