diff options
-rw-r--r-- | Remote/Helper/Chunked.hs | 104 | ||||
-rw-r--r-- | Utility/Metered.hs | 16 |
2 files changed, 119 insertions, 1 deletions
diff --git a/Remote/Helper/Chunked.hs b/Remote/Helper/Chunked.hs index 031ff63d6..e98400100 100644 --- a/Remote/Helper/Chunked.hs +++ b/Remote/Helper/Chunked.hs @@ -9,16 +9,21 @@ module Remote.Helper.Chunked ( ChunkSize , ChunkConfig(..) , chunkConfig + , storeChunks + , chunkKeys , meteredWriteFileChunks ) where import Common.Annex import Utility.DataUnits import Types.Remote -import Logs.Chunk.Pure (ChunkSize) +import Types.Key +import Logs.Chunk.Pure (ChunkSize, ChunkCount) +import Logs.Chunk import Utility.Metered import qualified Data.ByteString.Lazy as L +import qualified Data.ByteString as S import qualified Data.Map as M data ChunkConfig @@ -38,6 +43,103 @@ chunkConfig m = Just size | size > 0 -> fromInteger size _ -> error ("bad " ++ f) +-- An infinite stream of chunk keys, starting from chunk 1. +newtype ChunkKeyStream = ChunkKeyStream [Key] + +chunkKeyStream :: Key -> ChunkSize -> ChunkKeyStream +chunkKeyStream basek chunksize = ChunkKeyStream $ map mk [1..] + where + mk chunknum = sizedk { keyChunkNum = Just chunknum } + sizedk = basek { keyChunkSize = Just (toInteger chunksize) } + +nextChunkKeyStream :: ChunkKeyStream -> (Key, ChunkKeyStream) +nextChunkKeyStream (ChunkKeyStream (k:l)) = (k, ChunkKeyStream l) +nextChunkKeyStream (ChunkKeyStream []) = undefined -- stream is infinite! + +takeChunkKeyStream :: ChunkCount -> ChunkKeyStream -> [Key] +takeChunkKeyStream n (ChunkKeyStream l) = genericTake n l + +-- Number of chunks already consumed from the stream. +numChunks :: ChunkKeyStream -> Integer +numChunks = pred . fromJust . keyChunkNum . fst . nextChunkKeyStream + +{- Slits up the key's content into chunks, passing each chunk to + - the storer action, along with a unique chunk key. + - + - Note that the storer action is responsible for catching any + - exceptions it may encounter. + - + - A progress meter display is set up, and the storer action + - is passed a callback to update it. + - + - Once all chunks are successfully stored, updates the chunk log. + -} +storeChunks :: UUID -> ChunkConfig -> Key -> FilePath -> MeterUpdate -> (Key -> L.ByteString -> MeterUpdate -> Annex Bool) -> Annex Bool +storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate -> + either (\e -> liftIO (print e) >> return False) (go meterupdate) + =<< (liftIO $ tryIO $ L.readFile f) + where + go meterupdate b = case chunkconfig of + (UnpaddedChunks chunksize) | not (isChunkKey k) -> + gochunks meterupdate chunksize b (chunkKeyStream k chunksize) + _ -> storer k b meterupdate + + gochunks :: MeterUpdate -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex Bool + gochunks meterupdate chunksize lb = + loop zeroBytesProcessed chunksize (L.toChunks lb) [] + where + loop bytesprocessed sz [] c chunkkeys + -- Always store at least one chunk, + -- even for empty content. + | not (null c) || numchunks == 0 = + storechunk bytesprocessed sz [] c chunkkeys + | otherwise = do + chunksStored u k chunksize numchunks + return True + where + numchunks = numChunks chunkkeys + loop bytesprocessed sz (b:bs) c chunkkeys + | s <= sz || sz == chunksize = + loop bytesprocessed sz' bs (b:c) chunkkeys + | otherwise = + storechunk bytesprocessed sz' bs (b:c) chunkkeys + where + s = fromIntegral (S.length b) + sz' = sz - s + + storechunk bytesprocessed sz bs c chunkkeys = do + let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys + ifM (storer chunkkey (L.fromChunks $ reverse c) meterupdate') + ( do + let bytesprocessed' = addBytesProcessed bytesprocessed (chunksize - sz) + loop bytesprocessed' chunksize bs [] chunkkeys' + , return False + ) + where + {- The MeterUpdate that is passed to the action + - storing a chunk is offset, so that it reflects + - the total bytes that have already been stored + - in previous chunks. -} + meterupdate' = offsetMeterUpdate meterupdate bytesprocessed + +-- retrieveChunks :: UUID -> ChunkConfig -> Key -> Annex + +{- A key can be stored in a remote unchunked, or as a list of chunked keys. + - It's even possible for a remote to have the same key stored multiple + - times with different chunk sizes. This finds all possible lists of keys + - that might be on the remote that can be combined to get back the + - requested key. + -} +chunkKeys :: UUID -> ChunkConfig -> Key -> Annex [[Key]] +chunkKeys u (UnpaddedChunks _) k = do + chunklists <- map (toChunkList k) <$> getCurrentChunks u k + return ([k]:chunklists) +chunkKeys _ _ k = pure [[k]] + +toChunkList :: Key -> (ChunkSize, ChunkCount) -> [Key] +toChunkList k (chunksize, chunkcount) = takeChunkKeyStream chunkcount $ + chunkKeyStream k chunksize + {- Writes a series of chunks to a file. The feeder is called to get - each chunk. -} meteredWriteFileChunks :: MeterUpdate -> FilePath -> [v] -> (v -> IO L.ByteString) -> IO () diff --git a/Utility/Metered.hs b/Utility/Metered.hs index 0d94c1c5c..bca7f58e7 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -16,6 +16,7 @@ import qualified Data.ByteString as S import System.IO.Unsafe import Foreign.Storable (Storable(sizeOf)) import System.Posix.Types +import Data.Int {- An action that can be run repeatedly, updating it on the bytes processed. - @@ -31,6 +32,10 @@ class AsBytesProcessed a where toBytesProcessed :: a -> BytesProcessed fromBytesProcessed :: BytesProcessed -> a +instance AsBytesProcessed BytesProcessed where + toBytesProcessed = id + fromBytesProcessed = id + instance AsBytesProcessed Integer where toBytesProcessed i = BytesProcessed i fromBytesProcessed (BytesProcessed i) = i @@ -39,6 +44,10 @@ instance AsBytesProcessed Int where toBytesProcessed i = BytesProcessed $ toInteger i fromBytesProcessed (BytesProcessed i) = fromInteger i +instance AsBytesProcessed Int64 where + toBytesProcessed i = BytesProcessed $ toInteger i + fromBytesProcessed (BytesProcessed i) = fromInteger i + instance AsBytesProcessed FileOffset where toBytesProcessed sz = BytesProcessed $ toInteger sz fromBytesProcessed (BytesProcessed sz) = fromInteger sz @@ -77,6 +86,13 @@ meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO () meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h -> meteredWrite meterupdate h b +{- Applies an offset to a MeterUpdate. This can be useful when + - performing a sequence of actions, such as multiple meteredWriteFiles, + - that all update a common meter progressively. + -} +offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate +offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n) + {- This is like L.hGetContents, but after each chunk is read, a meter - is updated based on the size of the chunk. - |