summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-05-20 21:58:59 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-05-20 21:58:59 -0400
commitc65c4fc5202c9f94ad5ecd4d6d6a138ad923232d (patch)
tree0880487f52d9d4fea64fec704e97c65d77e9cf67 /Assistant
parent379cbe9c17211b43a26aa07e77a179438dbee174 (diff)
XMPP: Ignore duplicate messages received when pushing.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/XMPP/Git.hs120
1 files changed, 75 insertions, 45 deletions
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index f087f6d1f..93479014d 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -43,6 +43,22 @@ import System.Timeout
import qualified Data.ByteString as B
import qualified Data.Map as M
+{- Largest chunk of data to send in a single XMPP message. -}
+chunkSize :: Int
+chunkSize = 4096
+
+{- How long to wait for an expected message before assuming the other side
+ - has gone away and canceling a push.
+ -
+ - This needs to be long enough to allow a message of up to 2+ times
+ - chunkSize to propigate up to a XMPP server, perhaps across to another
+ - server, and back down to us. On the other hand, other XMPP pushes can be
+ - delayed for running until the timeout is reached, so it should not be
+ - excessive.
+ -}
+xmppTimeout :: Int
+xmppTimeout = 120000000 -- 120 seconds
+
finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $
makeXMPPGitRemote buddy (baseJID jid) u
@@ -132,24 +148,25 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
sendNetMessage $ Pushing cid $
SendPackOutput seqnum' b
toxmpp seqnum' inh
- fromxmpp outh controlh = forever $ do
- m <- timeout xmppTimeout <~> waitNetPushMessage SendPack
- case m of
- (Just (Pushing _ (ReceivePackOutput _ b))) ->
- liftIO $ writeChunk outh b
- (Just (Pushing _ (ReceivePackDone exitcode))) ->
- liftIO $ do
- hPrint controlh exitcode
- hFlush controlh
- (Just _) -> noop
- Nothing -> do
- debug ["timeout waiting for git receive-pack output via XMPP"]
- -- Send a synthetic exit code to git-annex
- -- xmppgit, which will exit and cause git push
- -- to die.
- liftIO $ do
- hPrint controlh (ExitFailure 1)
- hFlush controlh
+
+ fromxmpp outh controlh = withPushMessagesInSequence SendPack handle
+ where
+ handle (Just (Pushing _ (ReceivePackOutput _ b))) =
+ liftIO $ writeChunk outh b
+ handle (Just (Pushing _ (ReceivePackDone exitcode))) =
+ liftIO $ do
+ hPrint controlh exitcode
+ hFlush controlh
+ handle (Just _) = noop
+ handle Nothing = do
+ debug ["timeout waiting for git receive-pack output via XMPP"]
+ -- Send a synthetic exit code to git-annex
+ -- xmppgit, which will exit and cause git push
+ -- to die.
+ liftIO $ do
+ hPrint controlh (ExitFailure 1)
+ hFlush controlh
+
installwrapper tmpdir = liftIO $ do
createDirectoryIfMissing True tmpdir
let wrapper = tmpdir </> "git-remote-xmpp"
@@ -159,6 +176,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
, "exec " ++ program ++ " xmppgit"
]
modifyFileMode wrapper $ addModes executeModes
+
{- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp
- dir (ie, not on a crippled filesystem where we can't make
- the wrapper executable). -}
@@ -169,7 +187,6 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
tmp <- liftAnnex $ fromRepo gitAnnexTmpDir
return $ tmp </> "xmppgit"
Just d -> return $ d </> "xmppgit"
-
type EnvVar = String
@@ -245,19 +262,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
let seqnum' = succ seqnum
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
relaytoxmpp seqnum' outh
- relayfromxmpp inh = forever $ do
- m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack
- case m of
- (Just (Pushing _ (SendPackOutput _ b))) ->
- liftIO $ writeChunk inh b
- (Just _) -> noop
- Nothing -> do
- debug ["timeout waiting for git send-pack output via XMPP"]
- -- closing the handle will make
- -- git receive-pack exit
- liftIO $ do
- hClose inh
- killThread =<< myThreadId
+ relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle
+ where
+ handle (Just (Pushing _ (SendPackOutput _ b))) =
+ liftIO $ writeChunk inh b
+ handle (Just _) = noop
+ handle Nothing = do
+ debug ["timeout waiting for git send-pack output via XMPP"]
+ -- closing the handle will make git receive-pack exit
+ liftIO $ do
+ hClose inh
+ killThread =<< myThreadId
xmppRemotes :: ClientID -> UUID -> Assistant [Remote]
xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
@@ -310,18 +325,33 @@ writeChunk h b = do
B.hPut h b
hFlush h
-{- Largest chunk of data to send in a single XMPP message. -}
-chunkSize :: Int
-chunkSize = 4096
-
-{- How long to wait for an expected message before assuming the other side
- - has gone away and canceling a push.
+{- Gets NetMessages for a PushSide, ensures they are in order,
+ - and runs an action to handle each in turn. The action will be passed
+ - Nothing on timeout.
-
- - This needs to be long enough to allow a message of up to 2+ times
- - chunkSize to propigate up to a XMPP server, perhaps across to another
- - server, and back down to us. On the other hand, other XMPP pushes can be
- - delayed for running until the timeout is reached, so it should not be
- - excessive.
+ - Does not currently reorder messages, but does ensure that any
+ - duplicate messages, or messages not in the sequence, are discarded.
-}
-xmppTimeout :: Int
-xmppTimeout = 120000000 -- 120 seconds
+withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
+withPushMessagesInSequence side a = loop 0
+ where
+ loop seqnum = do
+ m <- timeout xmppTimeout <~> waitNetPushMessage side
+ let go s = a m >> loop s
+ case extractSequence =<< m of
+ Just seqnum'
+ | seqnum' == seqnum + 1 -> go seqnum'
+ | seqnum' == 0 -> go seqnum
+ | seqnum' == seqnum -> do
+ debug ["ignoring duplicate sequence number", show seqnum]
+ loop seqnum
+ | otherwise -> do
+ debug ["ignoring out of order sequence number", show seqnum', "expected", show seqnum]
+ loop seqnum
+ Nothing -> go seqnum
+
+extractSequence :: NetMessage -> Maybe Int
+extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
+extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
+extractSequence _ = Nothing
+