summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Remote/Helper/Chunked.hs104
-rw-r--r--Utility/Metered.hs16
2 files changed, 119 insertions, 1 deletions
diff --git a/Remote/Helper/Chunked.hs b/Remote/Helper/Chunked.hs
index 031ff63d6..e98400100 100644
--- a/Remote/Helper/Chunked.hs
+++ b/Remote/Helper/Chunked.hs
@@ -9,16 +9,21 @@ module Remote.Helper.Chunked
( ChunkSize
, ChunkConfig(..)
, chunkConfig
+ , storeChunks
+ , chunkKeys
, meteredWriteFileChunks
) where
import Common.Annex
import Utility.DataUnits
import Types.Remote
-import Logs.Chunk.Pure (ChunkSize)
+import Types.Key
+import Logs.Chunk.Pure (ChunkSize, ChunkCount)
+import Logs.Chunk
import Utility.Metered
import qualified Data.ByteString.Lazy as L
+import qualified Data.ByteString as S
import qualified Data.Map as M
data ChunkConfig
@@ -38,6 +43,103 @@ chunkConfig m =
Just size | size > 0 -> fromInteger size
_ -> error ("bad " ++ f)
+-- An infinite stream of chunk keys, starting from chunk 1.
+newtype ChunkKeyStream = ChunkKeyStream [Key]
+
+chunkKeyStream :: Key -> ChunkSize -> ChunkKeyStream
+chunkKeyStream basek chunksize = ChunkKeyStream $ map mk [1..]
+ where
+ mk chunknum = sizedk { keyChunkNum = Just chunknum }
+ sizedk = basek { keyChunkSize = Just (toInteger chunksize) }
+
+nextChunkKeyStream :: ChunkKeyStream -> (Key, ChunkKeyStream)
+nextChunkKeyStream (ChunkKeyStream (k:l)) = (k, ChunkKeyStream l)
+nextChunkKeyStream (ChunkKeyStream []) = undefined -- stream is infinite!
+
+takeChunkKeyStream :: ChunkCount -> ChunkKeyStream -> [Key]
+takeChunkKeyStream n (ChunkKeyStream l) = genericTake n l
+
+-- Number of chunks already consumed from the stream.
+numChunks :: ChunkKeyStream -> Integer
+numChunks = pred . fromJust . keyChunkNum . fst . nextChunkKeyStream
+
+{- Slits up the key's content into chunks, passing each chunk to
+ - the storer action, along with a unique chunk key.
+ -
+ - Note that the storer action is responsible for catching any
+ - exceptions it may encounter.
+ -
+ - A progress meter display is set up, and the storer action
+ - is passed a callback to update it.
+ -
+ - Once all chunks are successfully stored, updates the chunk log.
+ -}
+storeChunks :: UUID -> ChunkConfig -> Key -> FilePath -> MeterUpdate -> (Key -> L.ByteString -> MeterUpdate -> Annex Bool) -> Annex Bool
+storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
+ either (\e -> liftIO (print e) >> return False) (go meterupdate)
+ =<< (liftIO $ tryIO $ L.readFile f)
+ where
+ go meterupdate b = case chunkconfig of
+ (UnpaddedChunks chunksize) | not (isChunkKey k) ->
+ gochunks meterupdate chunksize b (chunkKeyStream k chunksize)
+ _ -> storer k b meterupdate
+
+ gochunks :: MeterUpdate -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex Bool
+ gochunks meterupdate chunksize lb =
+ loop zeroBytesProcessed chunksize (L.toChunks lb) []
+ where
+ loop bytesprocessed sz [] c chunkkeys
+ -- Always store at least one chunk,
+ -- even for empty content.
+ | not (null c) || numchunks == 0 =
+ storechunk bytesprocessed sz [] c chunkkeys
+ | otherwise = do
+ chunksStored u k chunksize numchunks
+ return True
+ where
+ numchunks = numChunks chunkkeys
+ loop bytesprocessed sz (b:bs) c chunkkeys
+ | s <= sz || sz == chunksize =
+ loop bytesprocessed sz' bs (b:c) chunkkeys
+ | otherwise =
+ storechunk bytesprocessed sz' bs (b:c) chunkkeys
+ where
+ s = fromIntegral (S.length b)
+ sz' = sz - s
+
+ storechunk bytesprocessed sz bs c chunkkeys = do
+ let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys
+ ifM (storer chunkkey (L.fromChunks $ reverse c) meterupdate')
+ ( do
+ let bytesprocessed' = addBytesProcessed bytesprocessed (chunksize - sz)
+ loop bytesprocessed' chunksize bs [] chunkkeys'
+ , return False
+ )
+ where
+ {- The MeterUpdate that is passed to the action
+ - storing a chunk is offset, so that it reflects
+ - the total bytes that have already been stored
+ - in previous chunks. -}
+ meterupdate' = offsetMeterUpdate meterupdate bytesprocessed
+
+-- retrieveChunks :: UUID -> ChunkConfig -> Key -> Annex
+
+{- A key can be stored in a remote unchunked, or as a list of chunked keys.
+ - It's even possible for a remote to have the same key stored multiple
+ - times with different chunk sizes. This finds all possible lists of keys
+ - that might be on the remote that can be combined to get back the
+ - requested key.
+ -}
+chunkKeys :: UUID -> ChunkConfig -> Key -> Annex [[Key]]
+chunkKeys u (UnpaddedChunks _) k = do
+ chunklists <- map (toChunkList k) <$> getCurrentChunks u k
+ return ([k]:chunklists)
+chunkKeys _ _ k = pure [[k]]
+
+toChunkList :: Key -> (ChunkSize, ChunkCount) -> [Key]
+toChunkList k (chunksize, chunkcount) = takeChunkKeyStream chunkcount $
+ chunkKeyStream k chunksize
+
{- Writes a series of chunks to a file. The feeder is called to get
- each chunk. -}
meteredWriteFileChunks :: MeterUpdate -> FilePath -> [v] -> (v -> IO L.ByteString) -> IO ()
diff --git a/Utility/Metered.hs b/Utility/Metered.hs
index 0d94c1c5c..bca7f58e7 100644
--- a/Utility/Metered.hs
+++ b/Utility/Metered.hs
@@ -16,6 +16,7 @@ import qualified Data.ByteString as S
import System.IO.Unsafe
import Foreign.Storable (Storable(sizeOf))
import System.Posix.Types
+import Data.Int
{- An action that can be run repeatedly, updating it on the bytes processed.
-
@@ -31,6 +32,10 @@ class AsBytesProcessed a where
toBytesProcessed :: a -> BytesProcessed
fromBytesProcessed :: BytesProcessed -> a
+instance AsBytesProcessed BytesProcessed where
+ toBytesProcessed = id
+ fromBytesProcessed = id
+
instance AsBytesProcessed Integer where
toBytesProcessed i = BytesProcessed i
fromBytesProcessed (BytesProcessed i) = i
@@ -39,6 +44,10 @@ instance AsBytesProcessed Int where
toBytesProcessed i = BytesProcessed $ toInteger i
fromBytesProcessed (BytesProcessed i) = fromInteger i
+instance AsBytesProcessed Int64 where
+ toBytesProcessed i = BytesProcessed $ toInteger i
+ fromBytesProcessed (BytesProcessed i) = fromInteger i
+
instance AsBytesProcessed FileOffset where
toBytesProcessed sz = BytesProcessed $ toInteger sz
fromBytesProcessed (BytesProcessed sz) = fromInteger sz
@@ -77,6 +86,13 @@ meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
meteredWrite meterupdate h b
+{- Applies an offset to a MeterUpdate. This can be useful when
+ - performing a sequence of actions, such as multiple meteredWriteFiles,
+ - that all update a common meter progressively.
+ -}
+offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate
+offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
+
{- This is like L.hGetContents, but after each chunk is read, a meter
- is updated based on the size of the chunk.
-