aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/NetMessager.hs134
-rw-r--r--Assistant/Threads/XMPPClient.hs10
-rw-r--r--Assistant/Types/NetMessager.hs25
-rw-r--r--Assistant/XMPP/Git.hs146
-rw-r--r--Utility/Directory.hs11
-rw-r--r--debian/changelog3
-rw-r--r--doc/design/assistant/more_cloud_providers.mdwn3
-rw-r--r--doc/design/assistant/xmpp.mdwn3
8 files changed, 207 insertions, 128 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index 97d17af6e..7ee4d464b 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -1,21 +1,23 @@
{- git-annex assistant out of band network messager interface
-
- - Copyright 2012 Joey Hess <joey@kitenet.net>
+ - Copyright 2012-2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
-import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
+ -
+ - If this is called when a push of the same side is running, it will
+ - block until that push completes, and then run.
-}
-runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
-runPush side clientid handledeferred a = do
+runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
+runPush side clientid a = do
nm <- getAssistant netMessager
- let runningv = getSide side $ netMessagerPushRunning nm
- let setup = void $ atomically $ swapTMVar runningv $ Just clientid
- let cleanup = atomically $ do
- void $ swapTMVar runningv Nothing
- emptytchan (getSide side $ netMessagesPush nm)
+ let v = getSide side $ netMessagerPushRunning nm
+ debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
+ let setup = do
+ debugmsg "preparing to run"
+ atomically $ ifM (isNothing <$> tryReadTMVar v)
+ ( putTMVar v clientid
+ , retry
+ )
+ debugmsg "started running"
+ let cleanup = do
+ debugmsg "finished running"
+ atomically $ takeTMVar v
r <- E.bracket_ setup cleanup <~> a
- (void . forkIO) <~> processdeferred nm
+ {- Empty the inbox, because stuff may have been left in it
+ - if the push failed. -}
+ emptyInbox clientid side
return r
- where
- emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
- processdeferred nm = do
- s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
- mapM_ rundeferred (S.toList s)
- rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
- <~> handledeferred m
-
-{- While a push is running, matching push messages are put into
- - netMessagesPush, while others that involve the same side go to
- - netMessagesPushDeferred.
+
+{- Stores messages for a push into the appropriate inbox.
-
- - When no push is running involving the same side, returns False.
+ - To avoid overflow, only 1000 messages max are stored in any
+ - inbox, which should be far more than necessary.
-
- - To avoid bloating memory, only messages that initiate pushes are
- - deferred.
+ - TODO: If we have more than 100 inboxes for different clients,
+ - discard old ones that are not currently being used by any push.
-}
-queueNetPushMessage :: NetMessage -> Assistant Bool
-queueNetPushMessage m@(Pushing clientid stage) = do
- nm <- getAssistant netMessager
- liftIO $ atomically $ do
- v <- readTMVar (getSide side $ netMessagerPushRunning nm)
- case v of
- Nothing -> return False
- (Just runningclientid)
- | runningclientid == clientid -> queue nm
- | isPushInitiation stage -> defer nm
- | otherwise -> discard
+storeInbox :: NetMessage -> Assistant ()
+storeInbox msg@(Pushing clientid stage) = do
+ inboxes <- getInboxes side
+ stored <- liftIO $ atomically $ do
+ m <- readTVar inboxes
+ let update = \v -> do
+ writeTVar inboxes $
+ M.insertWith' const clientid v m
+ return True
+ case M.lookup clientid m of
+ Nothing -> update (1, tostore)
+ Just (sz, l)
+ | sz > 1000 -> return False
+ | otherwise ->
+ let !sz' = sz + 1
+ !l' = D.append l tostore
+ in update (sz', l')
+ if stored
+ then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
+ else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
where
side = pushDestinationSide stage
- queue nm = do
- writeTChan (getSide side $ netMessagesPush nm) m
- return True
- defer nm = do
- let mv = getSide side $ netMessagesPushDeferred nm
- s <- takeTMVar mv
- putTMVar mv $ S.insert m s
- return True
- discard = return True
-queueNetPushMessage _ = return False
-
-waitNetPushMessage :: PushSide -> Assistant (NetMessage)
-waitNetPushMessage side = (atomically . readTChan)
- <<~ (getSide side . netMessagesPush . netMessager)
+ tostore = D.singleton msg
+storeInbox _ = noop
+
+{- Gets the new message for a push from its inbox.
+ - Blocks until a message has been received. -}
+waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
+waitInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $ do
+ m <- readTVar inboxes
+ case M.lookup clientid m of
+ Nothing -> retry
+ Just (sz, dl)
+ | sz < 1 -> retry
+ | otherwise -> do
+ let msg = D.head dl
+ let dl' = D.tail dl
+ let !sz' = sz - 1
+ writeTVar inboxes $
+ M.insertWith' const clientid (sz', dl') m
+ return msg
+
+emptyInbox :: ClientID -> PushSide -> Assistant ()
+emptyInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $
+ modifyTVar' inboxes $
+ M.delete clientid
+
+getInboxes :: PushSide -> Assistant Inboxes
+getInboxes side =
+ getSide side . netMessagesInboxes <$> getAssistant netMessager
+netMessagerDebug :: ClientID -> [String] -> Assistant ()
+netMessagerDebug clientid l = debug $
+ "NetMessager" : l ++ [show $ logClientID clientid]
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 086494a74..929b4c807 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -107,11 +107,11 @@ xmppClient urlrenderer d creds =
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle _ (GotNetMessage m@(Pushing _ pushstage))
- | isPushInitiation pushstage = inAssistant $
- unlessM (queueNetPushMessage m) $ do
- let checker = checkCloudRepos urlrenderer
- void $ forkIO <~> handlePushInitiation checker m
- | otherwise = void $ inAssistant $ queueNetPushMessage m
+ | isPushNotice pushstage = inAssistant $ handlePushNotice m
+ | isPushInitiation pushstage = inAssistant $ do
+ let checker = checkCloudRepos urlrenderer
+ void $ forkIO <~> handlePushInitiation checker m
+ | otherwise = void $ inAssistant $ storeInbox m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index 09a558033..2c9de253f 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -18,6 +18,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -85,13 +86,16 @@ logClientID c = T.concat [T.take 1 c, T.pack $ show $ T.length c]
{- Things that initiate either side of a push, but do not actually send data. -}
isPushInitiation :: PushStage -> Bool
-isPushInitiation (CanPush _) = True
isPushInitiation (PushRequest _) = True
isPushInitiation (StartingPush _) = True
isPushInitiation _ = False
+isPushNotice :: PushStage -> Bool
+isPushNotice (CanPush _) = True
+isPushNotice _ = False
+
data PushSide = SendPack | ReceivePack
- deriving (Eq, Ord)
+ deriving (Eq, Ord, Show)
pushDestinationSide :: PushStage -> PushSide
pushDestinationSide (CanPush _) = ReceivePack
@@ -114,6 +118,8 @@ mkSideMap gen = do
getSide :: PushSide -> SideMap a -> a
getSide side m = m side
+type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage))
+
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan NetMessage
@@ -124,11 +130,11 @@ data NetMessager = NetMessager
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time
- , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
- -- incoming messages related to a running push
- , netMessagesPush :: SideMap (TChan NetMessage)
- -- incoming push messages, deferred to be processed later
- , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
+ -- the TMVars are empty when nothing is running
+ , netMessagerPushRunning :: SideMap (TMVar ClientID)
+ -- incoming messages containing data for a push,
+ -- on a per-client and per-side basis
+ , netMessagesInboxes :: SideMap Inboxes
}
newNetMessager :: IO NetMessager
@@ -137,6 +143,5 @@ newNetMessager = NetMessager
<*> atomically (newTMVar M.empty)
<*> atomically (newTMVar M.empty)
<*> newEmptySV
- <*> mkSideMap (newTMVar Nothing)
- <*> mkSideMap newTChan
- <*> mkSideMap (newTMVar S.empty)
+ <*> mkSideMap newEmptyTMVar
+ <*> mkSideMap (newTVar M.empty)
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index f087f6d1f..98c70cf41 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -43,6 +43,22 @@ import System.Timeout
import qualified Data.ByteString as B
import qualified Data.Map as M
+{- Largest chunk of data to send in a single XMPP message. -}
+chunkSize :: Int
+chunkSize = 4096
+
+{- How long to wait for an expected message before assuming the other side
+ - has gone away and canceling a push.
+ -
+ - This needs to be long enough to allow a message of up to 2+ times
+ - chunkSize to propigate up to a XMPP server, perhaps across to another
+ - server, and back down to us. On the other hand, other XMPP pushes can be
+ - delayed for running until the timeout is reached, so it should not be
+ - excessive.
+ -}
+xmppTimeout :: Int
+xmppTimeout = 120000000 -- 120 seconds
+
finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $
makeXMPPGitRemote buddy (baseJID jid) u
@@ -83,8 +99,8 @@ makeXMPPGitRemote buddyname jid u = do
-
- We listen at the other end of the pipe and relay to and from XMPP.
-}
-xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> (NetMessage -> Assistant ()) -> Assistant Bool
-xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
+xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
+xmppPush cid gitpush = runPush SendPack cid $ do
u <- liftAnnex getUUID
sendNetMessage $ Pushing cid (StartingPush u)
@@ -132,24 +148,25 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
sendNetMessage $ Pushing cid $
SendPackOutput seqnum' b
toxmpp seqnum' inh
- fromxmpp outh controlh = forever $ do
- m <- timeout xmppTimeout <~> waitNetPushMessage SendPack
- case m of
- (Just (Pushing _ (ReceivePackOutput _ b))) ->
- liftIO $ writeChunk outh b
- (Just (Pushing _ (ReceivePackDone exitcode))) ->
- liftIO $ do
- hPrint controlh exitcode
- hFlush controlh
- (Just _) -> noop
- Nothing -> do
- debug ["timeout waiting for git receive-pack output via XMPP"]
- -- Send a synthetic exit code to git-annex
- -- xmppgit, which will exit and cause git push
- -- to die.
- liftIO $ do
- hPrint controlh (ExitFailure 1)
- hFlush controlh
+
+ fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle
+ where
+ handle (Just (Pushing _ (ReceivePackOutput _ b))) =
+ liftIO $ writeChunk outh b
+ handle (Just (Pushing _ (ReceivePackDone exitcode))) =
+ liftIO $ do
+ hPrint controlh exitcode
+ hFlush controlh
+ handle (Just _) = noop
+ handle Nothing = do
+ debug ["timeout waiting for git receive-pack output via XMPP"]
+ -- Send a synthetic exit code to git-annex
+ -- xmppgit, which will exit and cause git push
+ -- to die.
+ liftIO $ do
+ hPrint controlh (ExitFailure 1)
+ hFlush controlh
+
installwrapper tmpdir = liftIO $ do
createDirectoryIfMissing True tmpdir
let wrapper = tmpdir </> "git-remote-xmpp"
@@ -159,6 +176,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
, "exec " ++ program ++ " xmppgit"
]
modifyFileMode wrapper $ addModes executeModes
+
{- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp
- dir (ie, not on a crippled filesystem where we can't make
- the wrapper executable). -}
@@ -169,7 +187,6 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
tmp <- liftAnnex $ fromRepo gitAnnexTmpDir
return $ tmp </> "xmppgit"
Just d -> return $ d </> "xmppgit"
-
type EnvVar = String
@@ -219,8 +236,8 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
-xmppReceivePack :: ClientID -> (NetMessage -> Assistant ()) -> Assistant Bool
-xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
+xmppReceivePack :: ClientID -> Assistant Bool
+xmppReceivePack cid = runPush ReceivePack cid $ do
repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe
@@ -245,19 +262,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
let seqnum' = succ seqnum
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
relaytoxmpp seqnum' outh
- relayfromxmpp inh = forever $ do
- m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack
- case m of
- (Just (Pushing _ (SendPackOutput _ b))) ->
- liftIO $ writeChunk inh b
- (Just _) -> noop
- Nothing -> do
- debug ["timeout waiting for git send-pack output via XMPP"]
- -- closing the handle will make
- -- git receive-pack exit
- liftIO $ do
- hClose inh
- killThread =<< myThreadId
+ relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle
+ where
+ handle (Just (Pushing _ (SendPackOutput _ b))) =
+ liftIO $ writeChunk inh b
+ handle (Just _) = noop
+ handle Nothing = do
+ debug ["timeout waiting for git send-pack output via XMPP"]
+ -- closing the handle will make git receive-pack exit
+ liftIO $ do
+ hClose inh
+ killThread =<< myThreadId
xmppRemotes :: ClientID -> UUID -> Assistant [Remote]
xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
@@ -272,10 +287,6 @@ xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um
handlePushInitiation :: (Remote -> Assistant ()) -> NetMessage -> Assistant ()
-handlePushInitiation _ (Pushing cid (CanPush theiruuid)) =
- unlessM (null <$> xmppRemotes cid theiruuid) $ do
- u <- liftAnnex getUUID
- sendNetMessage $ Pushing cid (PushRequest u)
handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
go =<< liftAnnex (inRepo Git.Branch.current)
where
@@ -290,38 +301,55 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
forM_ rs $ \r -> do
void $ alertWhile (syncAlert [r]) $
- xmppPush cid
- (taggedPush u selfjid branch r)
- (handleDeferred checkcloudrepos)
+ xmppPush cid (taggedPush u selfjid branch r)
checkcloudrepos r
handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
rs <- xmppRemotes cid theiruuid
unless (null rs) $ do
void $ alertWhile (syncAlert rs) $
- xmppReceivePack cid (handleDeferred checkcloudrepos)
+ xmppReceivePack cid
mapM_ checkcloudrepos rs
handlePushInitiation _ _ = noop
-handleDeferred :: (Remote -> Assistant ()) -> NetMessage -> Assistant ()
-handleDeferred = handlePushInitiation
+handlePushNotice :: NetMessage -> Assistant ()
+handlePushNotice (Pushing cid (CanPush theiruuid)) =
+ unlessM (null <$> xmppRemotes cid theiruuid) $ do
+ u <- liftAnnex getUUID
+ sendNetMessage $ Pushing cid (PushRequest u)
+handlePushNotice _ = noop
writeChunk :: Handle -> B.ByteString -> IO ()
writeChunk h b = do
B.hPut h b
hFlush h
-{- Largest chunk of data to send in a single XMPP message. -}
-chunkSize :: Int
-chunkSize = 4096
-
-{- How long to wait for an expected message before assuming the other side
- - has gone away and canceling a push.
+{- Gets NetMessages for a PushSide, ensures they are in order,
+ - and runs an action to handle each in turn. The action will be passed
+ - Nothing on timeout.
-
- - This needs to be long enough to allow a message of up to 2+ times
- - chunkSize to propigate up to a XMPP server, perhaps across to another
- - server, and back down to us. On the other hand, other XMPP pushes can be
- - delayed for running until the timeout is reached, so it should not be
- - excessive.
+ - Does not currently reorder messages, but does ensure that any
+ - duplicate messages, or messages not in the sequence, are discarded.
-}
-xmppTimeout :: Int
-xmppTimeout = 120000000 -- 120 seconds
+withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
+withPushMessagesInSequence cid side a = loop 0
+ where
+ loop seqnum = do
+ m <- timeout xmppTimeout <~> waitInbox cid side
+ let go s = a m >> loop s
+ case extractSequence =<< m of
+ Just seqnum'
+ | seqnum' == seqnum + 1 -> go seqnum'
+ | seqnum' == 0 -> go seqnum
+ | seqnum' == seqnum -> do
+ debug ["ignoring duplicate sequence number", show seqnum]
+ loop seqnum
+ | otherwise -> do
+ debug ["ignoring out of order sequence number", show seqnum', "expected", show seqnum]
+ loop seqnum
+ Nothing -> go seqnum
+
+extractSequence :: NetMessage -> Maybe Int
+extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
+extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
+extractSequence _ = Nothing
+
diff --git a/Utility/Directory.hs b/Utility/Directory.hs
index 9477ad5b9..0a7690b44 100644
--- a/Utility/Directory.hs
+++ b/Utility/Directory.hs
@@ -5,6 +5,8 @@
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE CPP #-}
+
module Utility.Directory where
import System.IO.Error
@@ -85,9 +87,14 @@ moveFile src dest = tryIO (rename src dest) >>= onrename
(Left _) -> return False
(Right s) -> return $ isDirectory s
-{- Removes a file, which may or may not exist.
+{- Removes a file, which may or may not exist, and does not have to
+ - be a regular file.
-
- Note that an exception is thrown if the file exists but
- cannot be removed. -}
nukeFile :: FilePath -> IO ()
-nukeFile file = whenM (doesFileExist file) $ removeFile file
+#ifndef mingw32_HOST_OS
+nukeFile = removeLink
+#else
+nukeFile = removeFile
+#endif
diff --git a/debian/changelog b/debian/changelog
index 30b61700b..7b8476727 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -22,6 +22,9 @@ git-annex (4.20130517) UNRELEASED; urgency=low
* OSX: Fixed gpg included in dmg.
* Linux standalone: Back to being built with glibc 2.13 for maximum
portability.
+ * XMPP: Ignore duplicate messages received when pushing.
+ * XMPP: Be better at responding to CanPush messages when busy with
+ something else.
-- Joey Hess <joeyh@debian.org> Fri, 17 May 2013 11:17:03 -0400
diff --git a/doc/design/assistant/more_cloud_providers.mdwn b/doc/design/assistant/more_cloud_providers.mdwn
index 7949f8a7e..52372e06d 100644
--- a/doc/design/assistant/more_cloud_providers.mdwn
+++ b/doc/design/assistant/more_cloud_providers.mdwn
@@ -15,6 +15,7 @@ More should be added, such as:
REST API; free software
* Mediafire provides 50gb free and has a REST API.
* Flickr provides 1 tb (!!!!) to free accounts, and can store at least
- photos and videos.
+ photos and videos. <https://github.com/ricardobeat/filr> is a hack
+ to allow storing any type of file on Flickr.
See poll at [[polls/prioritizing_special_remotes]].
diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn
index f37ff9c46..5ee8bc508 100644
--- a/doc/design/assistant/xmpp.mdwn
+++ b/doc/design/assistant/xmpp.mdwn
@@ -96,8 +96,7 @@ one or more chat messages, directed to the receiver:
The value of rp and sp used to be empty, but now it's a sequence number.
This indicates the sequence of this packet, counting from 1. The receiver
-and sender each have their own sequence numbers. These sequence numbers
-are not really used yet, but are available for debugging.
+and sender each have their own sequence numbers.
When `git receive-pack` exits, the receiver indicates its exit
status with a chat message, directed at the sender: