From 1d9ba9d9459b44c306b7c4f261b84bad38185109 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 8 Dec 2016 19:56:02 -0400 Subject: update progress logs in remotedaemon send/receive --- P2P/Annex.hs | 57 ++++++++++++++++++++++++++++++++++----------------------- P2P/Protocol.hs | 43 ++++++++++++++++++------------------------- 2 files changed, 52 insertions(+), 48 deletions(-) (limited to 'P2P') diff --git a/P2P/Annex.hs b/P2P/Annex.hs index 3b95c8c02..93da9e69b 100644 --- a/P2P/Annex.hs +++ b/P2P/Annex.hs @@ -20,9 +20,9 @@ import P2P.Protocol import P2P.IO import Logs.Location import Types.NumCopies +import Utility.Metered import Control.Monad.Free -import qualified Data.ByteString.Lazy as L -- When we're serving a peer, we know their uuid, and can use it to update -- transfer logs. @@ -52,35 +52,33 @@ runLocal runmode runner a = case a of let getsize = liftIO . catchMaybeIO . getFileSize size <- inAnnex' isJust Nothing getsize k runner (next (Len <$> size)) - -- TODO transfer log not updated - ReadContent k af (Offset o) next -> do + ReadContent k af o sender next -> do v <- tryNonAsync $ prepSendAnnex k case v of - -- The check can detect a problem after the - -- content is sent, but we don't use it. - -- Instead, the receiving peer must AlwaysVerify - -- the content it receives. + -- The check can detect if the file + -- changed while it was transferred, but we don't + -- use it. Instead, the receiving peer must + -- AlwaysVerify the content it receives. Right (Just (f, _check)) -> do - v' <- tryNonAsync $ -- transfer upload k af $ - liftIO $ do - h <- openBinaryFile f ReadMode - when (o /= 0) $ - hSeek h AbsoluteSeek o - L.hGetContents h + v' <- tryNonAsync $ + transfer upload k af $ + sinkfile f o sender case v' of Left e -> return (Left (show e)) - Right b -> runner (next b) - Right Nothing -> return (Left "content not available") + Right (Left e) -> return (Left (show e)) + Right (Right ok) -> runner (next ok) + -- content not available + Right Nothing -> runner (next False) Left e -> return (Left (show e)) StoreContent k af o l getb next -> do ok <- flip catchNonAsync (const $ return False) $ - transfer download k af $ + transfer download k af $ \p -> getViaTmp AlwaysVerify k $ \tmp -> - unVerified $ storefile tmp o l getb + unVerified $ storefile tmp o l getb p runner (next ok) StoreContentTo dest o l getb next -> do ok <- flip catchNonAsync (const $ return False) $ - storefile dest o l getb + storefile dest o l getb nullMeterUpdate runner (next ok) SetPresent k u next -> do v <- tryNonAsync $ logChange k u InfoPresent @@ -116,18 +114,31 @@ runLocal runmode runner a = case a of transfer mk k af ta = case runmode of -- Update transfer logs when serving. Serving theiruuid -> - mk theiruuid k af noRetry (const ta) noNotification + mk theiruuid k af noRetry ta noNotification -- Transfer logs are updated higher in the stack when -- a client. - Client -> ta - storefile dest (Offset o) (Len l) getb = do + Client -> ta nullMeterUpdate + + storefile dest (Offset o) (Len l) getb p = do + let p' = offsetMeterUpdate p (toBytesProcessed o) v <- runner getb case v of Right b -> liftIO $ do withBinaryFile dest ReadWriteMode $ \h -> do when (o /= 0) $ hSeek h AbsoluteSeek o - L.hPut h b - sz <- liftIO $ getFileSize dest + meteredWrite p' h b + sz <- getFileSize dest return (toInteger sz == l + o) Left e -> error e + + sinkfile f (Offset o) sender p = bracket setup cleanup go + where + setup = liftIO $ openBinaryFile f ReadMode + cleanup = liftIO . hClose + go h = do + let p' = offsetMeterUpdate p (toBytesProcessed o) + when (o /= 0) $ + liftIO $ hSeek h AbsoluteSeek o + b <- liftIO $ hGetContentsMetered h p' + runner (sender b) diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 5d6c6fcc5..03c7c70cf 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -197,9 +197,10 @@ data LocalF c | ContentSize Key (Maybe Len -> c) -- ^ Gets size of the content of a key, when the full content is -- present. - | ReadContent Key AssociatedFile Offset (L.ByteString -> c) - -- ^ Lazily reads the content of a key. Note that the content - -- may change while it's being sent. + | ReadContent Key AssociatedFile Offset (L.ByteString -> Proto Bool) (Bool -> c) + -- ^ Reads the content of a key and sends it to the callback. + -- Note that the content may change while it's being sent. + -- If the content is not available, sends L.empty to the callback. | StoreContent Key AssociatedFile Offset Len (Proto L.ByteString) (Bool -> c) -- ^ Stores content to the key's temp file starting at an offset. -- Once the whole content of the key has been stored, moves the @@ -381,12 +382,20 @@ serveAuthed myuuid = void $ serverLoop handler handler _ = return ServerUnexpected sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool -sendContent key af offset@(Offset n) p = do - let p' = offsetMeterUpdate p (toBytesProcessed n) - (len, content) <- readContentLen key af offset - net $ sendMessage (DATA len) - net $ sendBytes len content p' - checkSuccess +sendContent key af offset@(Offset n) p = go =<< local (contentSize key) + where + go Nothing = sender (Len 0) L.empty + go (Just (Len totallen)) = do + let len = totallen - n + if len <= 0 + then sender (Len 0) L.empty + else local $ readContent key af offset $ + sender (Len len) + sender len content = do + let p' = offsetMeterUpdate p (toBytesProcessed n) + net $ sendMessage (DATA len) + net $ sendBytes len content p' + checkSuccess receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool receiveContent p sizer storer mkmsg = do @@ -419,22 +428,6 @@ sendSuccess :: Bool -> Proto () sendSuccess True = net $ sendMessage SUCCESS sendSuccess False = net $ sendMessage FAILURE --- Reads content from an offset. The Len should correspond to --- the length of the ByteString, but to avoid buffering the content --- in memory, is gotten using contentSize. -readContentLen :: Key -> AssociatedFile -> Offset -> Proto (Len, L.ByteString) -readContentLen key af (Offset offset) = go =<< local (contentSize key) - where - go Nothing = return (Len 0, L.empty) - go (Just (Len totallen)) = do - let len = totallen - offset - if len <= 0 - then return (Len 0, L.empty) - else do - content <- local $ - readContent key af (Offset offset) - return (Len len, content) - connect :: Service -> Handle -> Handle -> Proto ExitCode connect service hin hout = do net $ sendMessage (CONNECT service) -- cgit v1.2.3