aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-05-20 21:58:59 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-05-20 21:58:59 -0400
commitc65c4fc5202c9f94ad5ecd4d6d6a138ad923232d (patch)
tree0880487f52d9d4fea64fec704e97c65d77e9cf67
parent379cbe9c17211b43a26aa07e77a179438dbee174 (diff)
XMPP: Ignore duplicate messages received when pushing.
-rw-r--r--Assistant/XMPP/Git.hs120
-rw-r--r--debian/changelog1
-rw-r--r--doc/design/assistant/xmpp.mdwn3
3 files changed, 77 insertions, 47 deletions
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index f087f6d1f..93479014d 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -43,6 +43,22 @@ import System.Timeout
import qualified Data.ByteString as B
import qualified Data.Map as M
+{- Largest chunk of data to send in a single XMPP message. -}
+chunkSize :: Int
+chunkSize = 4096
+
+{- How long to wait for an expected message before assuming the other side
+ - has gone away and canceling a push.
+ -
+ - This needs to be long enough to allow a message of up to 2+ times
+ - chunkSize to propigate up to a XMPP server, perhaps across to another
+ - server, and back down to us. On the other hand, other XMPP pushes can be
+ - delayed for running until the timeout is reached, so it should not be
+ - excessive.
+ -}
+xmppTimeout :: Int
+xmppTimeout = 120000000 -- 120 seconds
+
finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $
makeXMPPGitRemote buddy (baseJID jid) u
@@ -132,24 +148,25 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
sendNetMessage $ Pushing cid $
SendPackOutput seqnum' b
toxmpp seqnum' inh
- fromxmpp outh controlh = forever $ do
- m <- timeout xmppTimeout <~> waitNetPushMessage SendPack
- case m of
- (Just (Pushing _ (ReceivePackOutput _ b))) ->
- liftIO $ writeChunk outh b
- (Just (Pushing _ (ReceivePackDone exitcode))) ->
- liftIO $ do
- hPrint controlh exitcode
- hFlush controlh
- (Just _) -> noop
- Nothing -> do
- debug ["timeout waiting for git receive-pack output via XMPP"]
- -- Send a synthetic exit code to git-annex
- -- xmppgit, which will exit and cause git push
- -- to die.
- liftIO $ do
- hPrint controlh (ExitFailure 1)
- hFlush controlh
+
+ fromxmpp outh controlh = withPushMessagesInSequence SendPack handle
+ where
+ handle (Just (Pushing _ (ReceivePackOutput _ b))) =
+ liftIO $ writeChunk outh b
+ handle (Just (Pushing _ (ReceivePackDone exitcode))) =
+ liftIO $ do
+ hPrint controlh exitcode
+ hFlush controlh
+ handle (Just _) = noop
+ handle Nothing = do
+ debug ["timeout waiting for git receive-pack output via XMPP"]
+ -- Send a synthetic exit code to git-annex
+ -- xmppgit, which will exit and cause git push
+ -- to die.
+ liftIO $ do
+ hPrint controlh (ExitFailure 1)
+ hFlush controlh
+
installwrapper tmpdir = liftIO $ do
createDirectoryIfMissing True tmpdir
let wrapper = tmpdir </> "git-remote-xmpp"
@@ -159,6 +176,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
, "exec " ++ program ++ " xmppgit"
]
modifyFileMode wrapper $ addModes executeModes
+
{- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp
- dir (ie, not on a crippled filesystem where we can't make
- the wrapper executable). -}
@@ -169,7 +187,6 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
tmp <- liftAnnex $ fromRepo gitAnnexTmpDir
return $ tmp </> "xmppgit"
Just d -> return $ d </> "xmppgit"
-
type EnvVar = String
@@ -245,19 +262,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
let seqnum' = succ seqnum
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
relaytoxmpp seqnum' outh
- relayfromxmpp inh = forever $ do
- m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack
- case m of
- (Just (Pushing _ (SendPackOutput _ b))) ->
- liftIO $ writeChunk inh b
- (Just _) -> noop
- Nothing -> do
- debug ["timeout waiting for git send-pack output via XMPP"]
- -- closing the handle will make
- -- git receive-pack exit
- liftIO $ do
- hClose inh
- killThread =<< myThreadId
+ relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle
+ where
+ handle (Just (Pushing _ (SendPackOutput _ b))) =
+ liftIO $ writeChunk inh b
+ handle (Just _) = noop
+ handle Nothing = do
+ debug ["timeout waiting for git send-pack output via XMPP"]
+ -- closing the handle will make git receive-pack exit
+ liftIO $ do
+ hClose inh
+ killThread =<< myThreadId
xmppRemotes :: ClientID -> UUID -> Assistant [Remote]
xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
@@ -310,18 +325,33 @@ writeChunk h b = do
B.hPut h b
hFlush h
-{- Largest chunk of data to send in a single XMPP message. -}
-chunkSize :: Int
-chunkSize = 4096
-
-{- How long to wait for an expected message before assuming the other side
- - has gone away and canceling a push.
+{- Gets NetMessages for a PushSide, ensures they are in order,
+ - and runs an action to handle each in turn. The action will be passed
+ - Nothing on timeout.
-
- - This needs to be long enough to allow a message of up to 2+ times
- - chunkSize to propigate up to a XMPP server, perhaps across to another
- - server, and back down to us. On the other hand, other XMPP pushes can be
- - delayed for running until the timeout is reached, so it should not be
- - excessive.
+ - Does not currently reorder messages, but does ensure that any
+ - duplicate messages, or messages not in the sequence, are discarded.
-}
-xmppTimeout :: Int
-xmppTimeout = 120000000 -- 120 seconds
+withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
+withPushMessagesInSequence side a = loop 0
+ where
+ loop seqnum = do
+ m <- timeout xmppTimeout <~> waitNetPushMessage side
+ let go s = a m >> loop s
+ case extractSequence =<< m of
+ Just seqnum'
+ | seqnum' == seqnum + 1 -> go seqnum'
+ | seqnum' == 0 -> go seqnum
+ | seqnum' == seqnum -> do
+ debug ["ignoring duplicate sequence number", show seqnum]
+ loop seqnum
+ | otherwise -> do
+ debug ["ignoring out of order sequence number", show seqnum', "expected", show seqnum]
+ loop seqnum
+ Nothing -> go seqnum
+
+extractSequence :: NetMessage -> Maybe Int
+extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
+extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
+extractSequence _ = Nothing
+
diff --git a/debian/changelog b/debian/changelog
index 30b61700b..97e8a5a87 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -22,6 +22,7 @@ git-annex (4.20130517) UNRELEASED; urgency=low
* OSX: Fixed gpg included in dmg.
* Linux standalone: Back to being built with glibc 2.13 for maximum
portability.
+ * XMPP: Ignore duplicate messages received when pushing.
-- Joey Hess <joeyh@debian.org> Fri, 17 May 2013 11:17:03 -0400
diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn
index f37ff9c46..5ee8bc508 100644
--- a/doc/design/assistant/xmpp.mdwn
+++ b/doc/design/assistant/xmpp.mdwn
@@ -96,8 +96,7 @@ one or more chat messages, directed to the receiver:
The value of rp and sp used to be empty, but now it's a sequence number.
This indicates the sequence of this packet, counting from 1. The receiver
-and sender each have their own sequence numbers. These sequence numbers
-are not really used yet, but are available for debugging.
+and sender each have their own sequence numbers.
When `git receive-pack` exits, the receiver indicates its exit
status with a chat message, directed at the sender: