diff options
Diffstat (limited to 'Utility')
-rw-r--r-- | Utility/Metered.hs | 116 | ||||
-rw-r--r-- | Utility/Observed.hs | 43 | ||||
-rw-r--r-- | Utility/Rsync.hs | 12 |
3 files changed, 122 insertions, 49 deletions
diff --git a/Utility/Metered.hs b/Utility/Metered.hs new file mode 100644 index 000000000..f33ad443a --- /dev/null +++ b/Utility/Metered.hs @@ -0,0 +1,116 @@ +{- Metered IO + - + - Copyright 2012, 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE TypeSynonymInstances #-} + +module Utility.Metered where + +import Common + +import qualified Data.ByteString.Lazy as L +import qualified Data.ByteString as S +import System.IO.Unsafe +import Foreign.Storable (Storable(sizeOf)) +import System.Posix.Types + +{- An action that can be run repeatedly, updating it on the bytes processed. + - + - Note that each call receives the total number of bytes processed, so + - far, *not* an incremental amount since the last call. -} +type MeterUpdate = (BytesProcessed -> IO ()) + +{- Total number of bytes processed so far. -} +newtype BytesProcessed = BytesProcessed Integer + deriving (Eq, Ord) + +class AsBytesProcessed a where + toBytesProcessed :: a -> BytesProcessed + fromBytesProcessed :: BytesProcessed -> a + +instance AsBytesProcessed Integer where + toBytesProcessed i = BytesProcessed i + fromBytesProcessed (BytesProcessed i) = i + +instance AsBytesProcessed Int where + toBytesProcessed i = BytesProcessed $ toInteger i + fromBytesProcessed (BytesProcessed i) = fromInteger i + +instance AsBytesProcessed FileOffset where + toBytesProcessed sz = BytesProcessed $ toInteger sz + fromBytesProcessed (BytesProcessed sz) = fromInteger sz + +addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed +addBytesProcessed (BytesProcessed i) v = + let (BytesProcessed n) = toBytesProcessed v + in BytesProcessed $! i + n + +zeroBytesProcessed :: BytesProcessed +zeroBytesProcessed = BytesProcessed 0 + +{- Sends the content of a file to an action, updating the meter as it's + - consumed. -} +withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a +withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h -> + hGetContentsMetered h meterupdate >>= a + +{- Sends the content of a file to a Handle, updating the meter as it's + - written. -} +streamMeteredFile :: FilePath -> MeterUpdate -> Handle -> IO () +streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h + +{- Writes a ByteString to a Handle, updating a meter as it's written. -} +meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO () +meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks + where + go _ [] = return () + go sofar (c:cs) = do + S.hPut h c + let sofar' = addBytesProcessed sofar $ S.length c + meterupdate sofar' + go sofar' cs + +meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO () +meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h -> + meteredWrite meterupdate h b + +{- This is like L.hGetContents, but after each chunk is read, a meter + - is updated based on the size of the chunk. + - + - Note that the meter update is run in unsafeInterleaveIO, which means that + - it can be run at any time. It's even possible for updates to run out + - of order, as different parts of the ByteString are consumed. + - + - All the usual caveats about using unsafeInterleaveIO apply to the + - meter updates, so use caution. + -} +hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString +hGetContentsMetered h meterupdate = lazyRead zeroBytesProcessed + where + lazyRead sofar = unsafeInterleaveIO $ loop sofar + + loop sofar = do + c <- S.hGetSome h defaultChunkSize + if S.null c + then do + hClose h + return $ L.empty + else do + let sofar' = addBytesProcessed sofar $ + S.length c + meterupdate sofar' + {- unsafeInterleaveIO causes this to be + - deferred until the data is read from the + - ByteString. -} + cs <- lazyRead sofar' + return $ L.append (L.fromChunks [c]) cs + +{- Same default chunk size Lazy ByteStrings use. -} +defaultChunkSize :: Int +defaultChunkSize = 32 * k - chunkOverhead + where + k = 1024 + chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific diff --git a/Utility/Observed.hs b/Utility/Observed.hs deleted file mode 100644 index 3ee973429..000000000 --- a/Utility/Observed.hs +++ /dev/null @@ -1,43 +0,0 @@ -module Utility.Observed where - -import qualified Data.ByteString.Lazy as L -import qualified Data.ByteString as S -import System.IO -import System.IO.Unsafe -import Foreign.Storable (Storable(sizeOf)) - -{- This is like L.hGetContents, but after each chunk is read, an action - - is run to observe the size of the chunk. - - - - Note that the observer is run in unsafeInterleaveIO, which means that - - it can be run at any time. It's even possible for observers to run out - - of order, as different parts of the ByteString are consumed. - - - - All the usual caveats about using unsafeInterleaveIO apply to the observers, - - so use caution. - -} -hGetContentsObserved :: Handle -> (Int -> IO ()) -> IO L.ByteString -hGetContentsObserved h observe = lazyRead - where - lazyRead = unsafeInterleaveIO loop - - loop = do - c <- S.hGetSome h defaultChunkSize - if S.null c - then do - hClose h - return $ L.empty - else do - observe $ S.length c - {- unsafeInterleaveIO causes this to be - - deferred until the data is read from the - - ByteString. -} - cs <- lazyRead - return $ L.append (L.fromChunks [c]) cs - -{- Same default chunk size Lazy ByteStrings use. -} -defaultChunkSize :: Int -defaultChunkSize = 32 * k - chunkOverhead - where - k = 1024 - chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific diff --git a/Utility/Rsync.hs b/Utility/Rsync.hs index e03824239..afb3dcbc8 100644 --- a/Utility/Rsync.hs +++ b/Utility/Rsync.hs @@ -8,6 +8,7 @@ module Utility.Rsync where import Common +import Utility.Metered import Data.Char @@ -44,14 +45,13 @@ rsyncServerParams = rsync :: [CommandParam] -> IO Bool rsync = boolSystem "rsync" -{- Runs rsync, but intercepts its progress output and feeds bytes - - complete values into the callback. The progress output is also output - - to stdout. +{- Runs rsync, but intercepts its progress output and updates a meter. + - The progress output is also output to stdout. - - The params must enable rsync's --progress mode for this to work. -} -rsyncProgress :: (Integer -> IO ()) -> [CommandParam] -> IO Bool -rsyncProgress callback params = do +rsyncProgress :: MeterUpdate -> [CommandParam] -> IO Bool +rsyncProgress meterupdate params = do r <- withHandle StdoutHandle createProcessSuccess p (feedprogress 0 []) {- For an unknown reason, piping rsync's output like this does - causes it to run a second ssh process, which it neglects to wait @@ -72,7 +72,7 @@ rsyncProgress callback params = do Nothing -> feedprogress prev buf' h (Just bytes) -> do when (bytes /= prev) $ - callback bytes + meterupdate $ toBytesProcessed bytes feedprogress bytes buf' h {- Checks if an rsync url involves the remote shell (ssh or rsh). |