diff options
-rw-r--r-- | Annex/Transfer.hs | 5 | ||||
-rw-r--r-- | P2P/Annex.hs | 57 | ||||
-rw-r--r-- | P2P/Protocol.hs | 43 | ||||
-rw-r--r-- | doc/todo/tor.mdwn | 1 |
4 files changed, 57 insertions, 49 deletions
diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index 323600e96..b33dace4a 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -45,6 +45,11 @@ instance Observable (Bool, Verification) where observeBool = fst observeFailure = (False, UnVerified) +instance Observable (Either e Bool) where + observeBool (Left _) = False + observeBool (Right b) = b + observeFailure = Right False + upload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v upload u key f d a _witness = guardHaveUUID u $ runTransfer (Transfer Upload u key) f d a diff --git a/P2P/Annex.hs b/P2P/Annex.hs index 3b95c8c02..93da9e69b 100644 --- a/P2P/Annex.hs +++ b/P2P/Annex.hs @@ -20,9 +20,9 @@ import P2P.Protocol import P2P.IO import Logs.Location import Types.NumCopies +import Utility.Metered import Control.Monad.Free -import qualified Data.ByteString.Lazy as L -- When we're serving a peer, we know their uuid, and can use it to update -- transfer logs. @@ -52,35 +52,33 @@ runLocal runmode runner a = case a of let getsize = liftIO . catchMaybeIO . getFileSize size <- inAnnex' isJust Nothing getsize k runner (next (Len <$> size)) - -- TODO transfer log not updated - ReadContent k af (Offset o) next -> do + ReadContent k af o sender next -> do v <- tryNonAsync $ prepSendAnnex k case v of - -- The check can detect a problem after the - -- content is sent, but we don't use it. - -- Instead, the receiving peer must AlwaysVerify - -- the content it receives. + -- The check can detect if the file + -- changed while it was transferred, but we don't + -- use it. Instead, the receiving peer must + -- AlwaysVerify the content it receives. Right (Just (f, _check)) -> do - v' <- tryNonAsync $ -- transfer upload k af $ - liftIO $ do - h <- openBinaryFile f ReadMode - when (o /= 0) $ - hSeek h AbsoluteSeek o - L.hGetContents h + v' <- tryNonAsync $ + transfer upload k af $ + sinkfile f o sender case v' of Left e -> return (Left (show e)) - Right b -> runner (next b) - Right Nothing -> return (Left "content not available") + Right (Left e) -> return (Left (show e)) + Right (Right ok) -> runner (next ok) + -- content not available + Right Nothing -> runner (next False) Left e -> return (Left (show e)) StoreContent k af o l getb next -> do ok <- flip catchNonAsync (const $ return False) $ - transfer download k af $ + transfer download k af $ \p -> getViaTmp AlwaysVerify k $ \tmp -> - unVerified $ storefile tmp o l getb + unVerified $ storefile tmp o l getb p runner (next ok) StoreContentTo dest o l getb next -> do ok <- flip catchNonAsync (const $ return False) $ - storefile dest o l getb + storefile dest o l getb nullMeterUpdate runner (next ok) SetPresent k u next -> do v <- tryNonAsync $ logChange k u InfoPresent @@ -116,18 +114,31 @@ runLocal runmode runner a = case a of transfer mk k af ta = case runmode of -- Update transfer logs when serving. Serving theiruuid -> - mk theiruuid k af noRetry (const ta) noNotification + mk theiruuid k af noRetry ta noNotification -- Transfer logs are updated higher in the stack when -- a client. - Client -> ta - storefile dest (Offset o) (Len l) getb = do + Client -> ta nullMeterUpdate + + storefile dest (Offset o) (Len l) getb p = do + let p' = offsetMeterUpdate p (toBytesProcessed o) v <- runner getb case v of Right b -> liftIO $ do withBinaryFile dest ReadWriteMode $ \h -> do when (o /= 0) $ hSeek h AbsoluteSeek o - L.hPut h b - sz <- liftIO $ getFileSize dest + meteredWrite p' h b + sz <- getFileSize dest return (toInteger sz == l + o) Left e -> error e + + sinkfile f (Offset o) sender p = bracket setup cleanup go + where + setup = liftIO $ openBinaryFile f ReadMode + cleanup = liftIO . hClose + go h = do + let p' = offsetMeterUpdate p (toBytesProcessed o) + when (o /= 0) $ + liftIO $ hSeek h AbsoluteSeek o + b <- liftIO $ hGetContentsMetered h p' + runner (sender b) diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 5d6c6fcc5..03c7c70cf 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -197,9 +197,10 @@ data LocalF c | ContentSize Key (Maybe Len -> c) -- ^ Gets size of the content of a key, when the full content is -- present. - | ReadContent Key AssociatedFile Offset (L.ByteString -> c) - -- ^ Lazily reads the content of a key. Note that the content - -- may change while it's being sent. + | ReadContent Key AssociatedFile Offset (L.ByteString -> Proto Bool) (Bool -> c) + -- ^ Reads the content of a key and sends it to the callback. + -- Note that the content may change while it's being sent. + -- If the content is not available, sends L.empty to the callback. | StoreContent Key AssociatedFile Offset Len (Proto L.ByteString) (Bool -> c) -- ^ Stores content to the key's temp file starting at an offset. -- Once the whole content of the key has been stored, moves the @@ -381,12 +382,20 @@ serveAuthed myuuid = void $ serverLoop handler handler _ = return ServerUnexpected sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool -sendContent key af offset@(Offset n) p = do - let p' = offsetMeterUpdate p (toBytesProcessed n) - (len, content) <- readContentLen key af offset - net $ sendMessage (DATA len) - net $ sendBytes len content p' - checkSuccess +sendContent key af offset@(Offset n) p = go =<< local (contentSize key) + where + go Nothing = sender (Len 0) L.empty + go (Just (Len totallen)) = do + let len = totallen - n + if len <= 0 + then sender (Len 0) L.empty + else local $ readContent key af offset $ + sender (Len len) + sender len content = do + let p' = offsetMeterUpdate p (toBytesProcessed n) + net $ sendMessage (DATA len) + net $ sendBytes len content p' + checkSuccess receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool receiveContent p sizer storer mkmsg = do @@ -419,22 +428,6 @@ sendSuccess :: Bool -> Proto () sendSuccess True = net $ sendMessage SUCCESS sendSuccess False = net $ sendMessage FAILURE --- Reads content from an offset. The Len should correspond to --- the length of the ByteString, but to avoid buffering the content --- in memory, is gotten using contentSize. -readContentLen :: Key -> AssociatedFile -> Offset -> Proto (Len, L.ByteString) -readContentLen key af (Offset offset) = go =<< local (contentSize key) - where - go Nothing = return (Len 0, L.empty) - go (Just (Len totallen)) = do - let len = totallen - offset - if len <= 0 - then return (Len 0, L.empty) - else do - content <- local $ - readContent key af (Offset offset) - return (Len len, content) - connect :: Service -> Handle -> Handle -> Proto ExitCode connect service hin hout = do net $ sendMessage (CONNECT service) diff --git a/doc/todo/tor.mdwn b/doc/todo/tor.mdwn index 79822ab19..01fa2635a 100644 --- a/doc/todo/tor.mdwn +++ b/doc/todo/tor.mdwn @@ -8,7 +8,6 @@ Current todo list: object is already in progress, the message about this is output by the remotedaemon --debug, but not forwarded to the peer, which shows "Connection reset by peer" -* update progress logs in remotedaemon send/receive * Think about locking some more. What happens if the connection to the peer is dropped while we think we're locking content there from being dropped? |