From c65c4fc5202c9f94ad5ecd4d6d6a138ad923232d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 20 May 2013 21:58:59 -0400 Subject: XMPP: Ignore duplicate messages received when pushing. --- Assistant/XMPP/Git.hs | 120 +++++++++++++++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 45 deletions(-) (limited to 'Assistant') diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index f087f6d1f..93479014d 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -43,6 +43,22 @@ import System.Timeout import qualified Data.ByteString as B import qualified Data.Map as M +{- Largest chunk of data to send in a single XMPP message. -} +chunkSize :: Int +chunkSize = 4096 + +{- How long to wait for an expected message before assuming the other side + - has gone away and canceling a push. + - + - This needs to be long enough to allow a message of up to 2+ times + - chunkSize to propigate up to a XMPP server, perhaps across to another + - server, and back down to us. On the other hand, other XMPP pushes can be + - delayed for running until the timeout is reached, so it should not be + - excessive. + -} +xmppTimeout :: Int +xmppTimeout = 120000000 -- 120 seconds + finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing jid u = void $ alertWhile alert $ makeXMPPGitRemote buddy (baseJID jid) u @@ -132,24 +148,25 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do sendNetMessage $ Pushing cid $ SendPackOutput seqnum' b toxmpp seqnum' inh - fromxmpp outh controlh = forever $ do - m <- timeout xmppTimeout <~> waitNetPushMessage SendPack - case m of - (Just (Pushing _ (ReceivePackOutput _ b))) -> - liftIO $ writeChunk outh b - (Just (Pushing _ (ReceivePackDone exitcode))) -> - liftIO $ do - hPrint controlh exitcode - hFlush controlh - (Just _) -> noop - Nothing -> do - debug ["timeout waiting for git receive-pack output via XMPP"] - -- Send a synthetic exit code to git-annex - -- xmppgit, which will exit and cause git push - -- to die. - liftIO $ do - hPrint controlh (ExitFailure 1) - hFlush controlh + + fromxmpp outh controlh = withPushMessagesInSequence SendPack handle + where + handle (Just (Pushing _ (ReceivePackOutput _ b))) = + liftIO $ writeChunk outh b + handle (Just (Pushing _ (ReceivePackDone exitcode))) = + liftIO $ do + hPrint controlh exitcode + hFlush controlh + handle (Just _) = noop + handle Nothing = do + debug ["timeout waiting for git receive-pack output via XMPP"] + -- Send a synthetic exit code to git-annex + -- xmppgit, which will exit and cause git push + -- to die. + liftIO $ do + hPrint controlh (ExitFailure 1) + hFlush controlh + installwrapper tmpdir = liftIO $ do createDirectoryIfMissing True tmpdir let wrapper = tmpdir "git-remote-xmpp" @@ -159,6 +176,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do , "exec " ++ program ++ " xmppgit" ] modifyFileMode wrapper $ addModes executeModes + {- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp - dir (ie, not on a crippled filesystem where we can't make - the wrapper executable). -} @@ -169,7 +187,6 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do tmp <- liftAnnex $ fromRepo gitAnnexTmpDir return $ tmp "xmppgit" Just d -> return $ d "xmppgit" - type EnvVar = String @@ -245,19 +262,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do let seqnum' = succ seqnum sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b relaytoxmpp seqnum' outh - relayfromxmpp inh = forever $ do - m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack - case m of - (Just (Pushing _ (SendPackOutput _ b))) -> - liftIO $ writeChunk inh b - (Just _) -> noop - Nothing -> do - debug ["timeout waiting for git send-pack output via XMPP"] - -- closing the handle will make - -- git receive-pack exit - liftIO $ do - hClose inh - killThread =<< myThreadId + relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle + where + handle (Just (Pushing _ (SendPackOutput _ b))) = + liftIO $ writeChunk inh b + handle (Just _) = noop + handle Nothing = do + debug ["timeout waiting for git send-pack output via XMPP"] + -- closing the handle will make git receive-pack exit + liftIO $ do + hClose inh + killThread =<< myThreadId xmppRemotes :: ClientID -> UUID -> Assistant [Remote] xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of @@ -310,18 +325,33 @@ writeChunk h b = do B.hPut h b hFlush h -{- Largest chunk of data to send in a single XMPP message. -} -chunkSize :: Int -chunkSize = 4096 - -{- How long to wait for an expected message before assuming the other side - - has gone away and canceling a push. +{- Gets NetMessages for a PushSide, ensures they are in order, + - and runs an action to handle each in turn. The action will be passed + - Nothing on timeout. - - - This needs to be long enough to allow a message of up to 2+ times - - chunkSize to propigate up to a XMPP server, perhaps across to another - - server, and back down to us. On the other hand, other XMPP pushes can be - - delayed for running until the timeout is reached, so it should not be - - excessive. + - Does not currently reorder messages, but does ensure that any + - duplicate messages, or messages not in the sequence, are discarded. -} -xmppTimeout :: Int -xmppTimeout = 120000000 -- 120 seconds +withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant () +withPushMessagesInSequence side a = loop 0 + where + loop seqnum = do + m <- timeout xmppTimeout <~> waitNetPushMessage side + let go s = a m >> loop s + case extractSequence =<< m of + Just seqnum' + | seqnum' == seqnum + 1 -> go seqnum' + | seqnum' == 0 -> go seqnum + | seqnum' == seqnum -> do + debug ["ignoring duplicate sequence number", show seqnum] + loop seqnum + | otherwise -> do + debug ["ignoring out of order sequence number", show seqnum', "expected", show seqnum] + loop seqnum + Nothing -> go seqnum + +extractSequence :: NetMessage -> Maybe Int +extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum +extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum +extractSequence _ = Nothing + -- cgit v1.2.3