summaryrefslogtreecommitdiff
path: root/Utility
diff options
context:
space:
mode:
Diffstat (limited to 'Utility')
-rw-r--r--Utility/Metered.hs116
-rw-r--r--Utility/Observed.hs43
-rw-r--r--Utility/Rsync.hs12
3 files changed, 122 insertions, 49 deletions
diff --git a/Utility/Metered.hs b/Utility/Metered.hs
new file mode 100644
index 000000000..f33ad443a
--- /dev/null
+++ b/Utility/Metered.hs
@@ -0,0 +1,116 @@
+{- Metered IO
+ -
+ - Copyright 2012, 2013 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeSynonymInstances #-}
+
+module Utility.Metered where
+
+import Common
+
+import qualified Data.ByteString.Lazy as L
+import qualified Data.ByteString as S
+import System.IO.Unsafe
+import Foreign.Storable (Storable(sizeOf))
+import System.Posix.Types
+
+{- An action that can be run repeatedly, updating it on the bytes processed.
+ -
+ - Note that each call receives the total number of bytes processed, so
+ - far, *not* an incremental amount since the last call. -}
+type MeterUpdate = (BytesProcessed -> IO ())
+
+{- Total number of bytes processed so far. -}
+newtype BytesProcessed = BytesProcessed Integer
+ deriving (Eq, Ord)
+
+class AsBytesProcessed a where
+ toBytesProcessed :: a -> BytesProcessed
+ fromBytesProcessed :: BytesProcessed -> a
+
+instance AsBytesProcessed Integer where
+ toBytesProcessed i = BytesProcessed i
+ fromBytesProcessed (BytesProcessed i) = i
+
+instance AsBytesProcessed Int where
+ toBytesProcessed i = BytesProcessed $ toInteger i
+ fromBytesProcessed (BytesProcessed i) = fromInteger i
+
+instance AsBytesProcessed FileOffset where
+ toBytesProcessed sz = BytesProcessed $ toInteger sz
+ fromBytesProcessed (BytesProcessed sz) = fromInteger sz
+
+addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed
+addBytesProcessed (BytesProcessed i) v =
+ let (BytesProcessed n) = toBytesProcessed v
+ in BytesProcessed $! i + n
+
+zeroBytesProcessed :: BytesProcessed
+zeroBytesProcessed = BytesProcessed 0
+
+{- Sends the content of a file to an action, updating the meter as it's
+ - consumed. -}
+withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
+withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
+ hGetContentsMetered h meterupdate >>= a
+
+{- Sends the content of a file to a Handle, updating the meter as it's
+ - written. -}
+streamMeteredFile :: FilePath -> MeterUpdate -> Handle -> IO ()
+streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h
+
+{- Writes a ByteString to a Handle, updating a meter as it's written. -}
+meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
+meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks
+ where
+ go _ [] = return ()
+ go sofar (c:cs) = do
+ S.hPut h c
+ let sofar' = addBytesProcessed sofar $ S.length c
+ meterupdate sofar'
+ go sofar' cs
+
+meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
+meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
+ meteredWrite meterupdate h b
+
+{- This is like L.hGetContents, but after each chunk is read, a meter
+ - is updated based on the size of the chunk.
+ -
+ - Note that the meter update is run in unsafeInterleaveIO, which means that
+ - it can be run at any time. It's even possible for updates to run out
+ - of order, as different parts of the ByteString are consumed.
+ -
+ - All the usual caveats about using unsafeInterleaveIO apply to the
+ - meter updates, so use caution.
+ -}
+hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
+hGetContentsMetered h meterupdate = lazyRead zeroBytesProcessed
+ where
+ lazyRead sofar = unsafeInterleaveIO $ loop sofar
+
+ loop sofar = do
+ c <- S.hGetSome h defaultChunkSize
+ if S.null c
+ then do
+ hClose h
+ return $ L.empty
+ else do
+ let sofar' = addBytesProcessed sofar $
+ S.length c
+ meterupdate sofar'
+ {- unsafeInterleaveIO causes this to be
+ - deferred until the data is read from the
+ - ByteString. -}
+ cs <- lazyRead sofar'
+ return $ L.append (L.fromChunks [c]) cs
+
+{- Same default chunk size Lazy ByteStrings use. -}
+defaultChunkSize :: Int
+defaultChunkSize = 32 * k - chunkOverhead
+ where
+ k = 1024
+ chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific
diff --git a/Utility/Observed.hs b/Utility/Observed.hs
deleted file mode 100644
index 3ee973429..000000000
--- a/Utility/Observed.hs
+++ /dev/null
@@ -1,43 +0,0 @@
-module Utility.Observed where
-
-import qualified Data.ByteString.Lazy as L
-import qualified Data.ByteString as S
-import System.IO
-import System.IO.Unsafe
-import Foreign.Storable (Storable(sizeOf))
-
-{- This is like L.hGetContents, but after each chunk is read, an action
- - is run to observe the size of the chunk.
- -
- - Note that the observer is run in unsafeInterleaveIO, which means that
- - it can be run at any time. It's even possible for observers to run out
- - of order, as different parts of the ByteString are consumed.
- -
- - All the usual caveats about using unsafeInterleaveIO apply to the observers,
- - so use caution.
- -}
-hGetContentsObserved :: Handle -> (Int -> IO ()) -> IO L.ByteString
-hGetContentsObserved h observe = lazyRead
- where
- lazyRead = unsafeInterleaveIO loop
-
- loop = do
- c <- S.hGetSome h defaultChunkSize
- if S.null c
- then do
- hClose h
- return $ L.empty
- else do
- observe $ S.length c
- {- unsafeInterleaveIO causes this to be
- - deferred until the data is read from the
- - ByteString. -}
- cs <- lazyRead
- return $ L.append (L.fromChunks [c]) cs
-
-{- Same default chunk size Lazy ByteStrings use. -}
-defaultChunkSize :: Int
-defaultChunkSize = 32 * k - chunkOverhead
- where
- k = 1024
- chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific
diff --git a/Utility/Rsync.hs b/Utility/Rsync.hs
index e03824239..afb3dcbc8 100644
--- a/Utility/Rsync.hs
+++ b/Utility/Rsync.hs
@@ -8,6 +8,7 @@
module Utility.Rsync where
import Common
+import Utility.Metered
import Data.Char
@@ -44,14 +45,13 @@ rsyncServerParams =
rsync :: [CommandParam] -> IO Bool
rsync = boolSystem "rsync"
-{- Runs rsync, but intercepts its progress output and feeds bytes
- - complete values into the callback. The progress output is also output
- - to stdout.
+{- Runs rsync, but intercepts its progress output and updates a meter.
+ - The progress output is also output to stdout.
-
- The params must enable rsync's --progress mode for this to work.
-}
-rsyncProgress :: (Integer -> IO ()) -> [CommandParam] -> IO Bool
-rsyncProgress callback params = do
+rsyncProgress :: MeterUpdate -> [CommandParam] -> IO Bool
+rsyncProgress meterupdate params = do
r <- withHandle StdoutHandle createProcessSuccess p (feedprogress 0 [])
{- For an unknown reason, piping rsync's output like this does
- causes it to run a second ssh process, which it neglects to wait
@@ -72,7 +72,7 @@ rsyncProgress callback params = do
Nothing -> feedprogress prev buf' h
(Just bytes) -> do
when (bytes /= prev) $
- callback bytes
+ meterupdate $ toBytesProcessed bytes
feedprogress bytes buf' h
{- Checks if an rsync url involves the remote shell (ssh or rsh).