summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Annex/Transfer.hs5
-rw-r--r--P2P/Annex.hs57
-rw-r--r--P2P/Protocol.hs43
-rw-r--r--doc/todo/tor.mdwn1
4 files changed, 57 insertions, 49 deletions
diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs
index 323600e96..b33dace4a 100644
--- a/Annex/Transfer.hs
+++ b/Annex/Transfer.hs
@@ -45,6 +45,11 @@ instance Observable (Bool, Verification) where
observeBool = fst
observeFailure = (False, UnVerified)
+instance Observable (Either e Bool) where
+ observeBool (Left _) = False
+ observeBool (Right b) = b
+ observeFailure = Right False
+
upload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
upload u key f d a _witness = guardHaveUUID u $
runTransfer (Transfer Upload u key) f d a
diff --git a/P2P/Annex.hs b/P2P/Annex.hs
index 3b95c8c02..93da9e69b 100644
--- a/P2P/Annex.hs
+++ b/P2P/Annex.hs
@@ -20,9 +20,9 @@ import P2P.Protocol
import P2P.IO
import Logs.Location
import Types.NumCopies
+import Utility.Metered
import Control.Monad.Free
-import qualified Data.ByteString.Lazy as L
-- When we're serving a peer, we know their uuid, and can use it to update
-- transfer logs.
@@ -52,35 +52,33 @@ runLocal runmode runner a = case a of
let getsize = liftIO . catchMaybeIO . getFileSize
size <- inAnnex' isJust Nothing getsize k
runner (next (Len <$> size))
- -- TODO transfer log not updated
- ReadContent k af (Offset o) next -> do
+ ReadContent k af o sender next -> do
v <- tryNonAsync $ prepSendAnnex k
case v of
- -- The check can detect a problem after the
- -- content is sent, but we don't use it.
- -- Instead, the receiving peer must AlwaysVerify
- -- the content it receives.
+ -- The check can detect if the file
+ -- changed while it was transferred, but we don't
+ -- use it. Instead, the receiving peer must
+ -- AlwaysVerify the content it receives.
Right (Just (f, _check)) -> do
- v' <- tryNonAsync $ -- transfer upload k af $
- liftIO $ do
- h <- openBinaryFile f ReadMode
- when (o /= 0) $
- hSeek h AbsoluteSeek o
- L.hGetContents h
+ v' <- tryNonAsync $
+ transfer upload k af $
+ sinkfile f o sender
case v' of
Left e -> return (Left (show e))
- Right b -> runner (next b)
- Right Nothing -> return (Left "content not available")
+ Right (Left e) -> return (Left (show e))
+ Right (Right ok) -> runner (next ok)
+ -- content not available
+ Right Nothing -> runner (next False)
Left e -> return (Left (show e))
StoreContent k af o l getb next -> do
ok <- flip catchNonAsync (const $ return False) $
- transfer download k af $
+ transfer download k af $ \p ->
getViaTmp AlwaysVerify k $ \tmp ->
- unVerified $ storefile tmp o l getb
+ unVerified $ storefile tmp o l getb p
runner (next ok)
StoreContentTo dest o l getb next -> do
ok <- flip catchNonAsync (const $ return False) $
- storefile dest o l getb
+ storefile dest o l getb nullMeterUpdate
runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
@@ -116,18 +114,31 @@ runLocal runmode runner a = case a of
transfer mk k af ta = case runmode of
-- Update transfer logs when serving.
Serving theiruuid ->
- mk theiruuid k af noRetry (const ta) noNotification
+ mk theiruuid k af noRetry ta noNotification
-- Transfer logs are updated higher in the stack when
-- a client.
- Client -> ta
- storefile dest (Offset o) (Len l) getb = do
+ Client -> ta nullMeterUpdate
+
+ storefile dest (Offset o) (Len l) getb p = do
+ let p' = offsetMeterUpdate p (toBytesProcessed o)
v <- runner getb
case v of
Right b -> liftIO $ do
withBinaryFile dest ReadWriteMode $ \h -> do
when (o /= 0) $
hSeek h AbsoluteSeek o
- L.hPut h b
- sz <- liftIO $ getFileSize dest
+ meteredWrite p' h b
+ sz <- getFileSize dest
return (toInteger sz == l + o)
Left e -> error e
+
+ sinkfile f (Offset o) sender p = bracket setup cleanup go
+ where
+ setup = liftIO $ openBinaryFile f ReadMode
+ cleanup = liftIO . hClose
+ go h = do
+ let p' = offsetMeterUpdate p (toBytesProcessed o)
+ when (o /= 0) $
+ liftIO $ hSeek h AbsoluteSeek o
+ b <- liftIO $ hGetContentsMetered h p'
+ runner (sender b)
diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs
index 5d6c6fcc5..03c7c70cf 100644
--- a/P2P/Protocol.hs
+++ b/P2P/Protocol.hs
@@ -197,9 +197,10 @@ data LocalF c
| ContentSize Key (Maybe Len -> c)
-- ^ Gets size of the content of a key, when the full content is
-- present.
- | ReadContent Key AssociatedFile Offset (L.ByteString -> c)
- -- ^ Lazily reads the content of a key. Note that the content
- -- may change while it's being sent.
+ | ReadContent Key AssociatedFile Offset (L.ByteString -> Proto Bool) (Bool -> c)
+ -- ^ Reads the content of a key and sends it to the callback.
+ -- Note that the content may change while it's being sent.
+ -- If the content is not available, sends L.empty to the callback.
| StoreContent Key AssociatedFile Offset Len (Proto L.ByteString) (Bool -> c)
-- ^ Stores content to the key's temp file starting at an offset.
-- Once the whole content of the key has been stored, moves the
@@ -381,12 +382,20 @@ serveAuthed myuuid = void $ serverLoop handler
handler _ = return ServerUnexpected
sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool
-sendContent key af offset@(Offset n) p = do
- let p' = offsetMeterUpdate p (toBytesProcessed n)
- (len, content) <- readContentLen key af offset
- net $ sendMessage (DATA len)
- net $ sendBytes len content p'
- checkSuccess
+sendContent key af offset@(Offset n) p = go =<< local (contentSize key)
+ where
+ go Nothing = sender (Len 0) L.empty
+ go (Just (Len totallen)) = do
+ let len = totallen - n
+ if len <= 0
+ then sender (Len 0) L.empty
+ else local $ readContent key af offset $
+ sender (Len len)
+ sender len content = do
+ let p' = offsetMeterUpdate p (toBytesProcessed n)
+ net $ sendMessage (DATA len)
+ net $ sendBytes len content p'
+ checkSuccess
receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
receiveContent p sizer storer mkmsg = do
@@ -419,22 +428,6 @@ sendSuccess :: Bool -> Proto ()
sendSuccess True = net $ sendMessage SUCCESS
sendSuccess False = net $ sendMessage FAILURE
--- Reads content from an offset. The Len should correspond to
--- the length of the ByteString, but to avoid buffering the content
--- in memory, is gotten using contentSize.
-readContentLen :: Key -> AssociatedFile -> Offset -> Proto (Len, L.ByteString)
-readContentLen key af (Offset offset) = go =<< local (contentSize key)
- where
- go Nothing = return (Len 0, L.empty)
- go (Just (Len totallen)) = do
- let len = totallen - offset
- if len <= 0
- then return (Len 0, L.empty)
- else do
- content <- local $
- readContent key af (Offset offset)
- return (Len len, content)
-
connect :: Service -> Handle -> Handle -> Proto ExitCode
connect service hin hout = do
net $ sendMessage (CONNECT service)
diff --git a/doc/todo/tor.mdwn b/doc/todo/tor.mdwn
index 79822ab19..01fa2635a 100644
--- a/doc/todo/tor.mdwn
+++ b/doc/todo/tor.mdwn
@@ -8,7 +8,6 @@ Current todo list:
object is already in progress, the message about this is output by the
remotedaemon --debug, but not forwarded to the peer, which shows
"Connection reset by peer"
-* update progress logs in remotedaemon send/receive
* Think about locking some more. What happens if the connection to the peer
is dropped while we think we're locking content there from being dropped?