diff options
-rw-r--r-- | Assistant/Threads/TransferPoller.hs | 21 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 20 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 2 | ||||
-rw-r--r-- | Logs/Transfer.hs | 4 | ||||
-rw-r--r-- | Remote/Git.hs | 24 | ||||
-rw-r--r-- | Remote/Rsync.hs | 19 | ||||
-rw-r--r-- | Utility/DirWatcher.hs | 14 | ||||
-rw-r--r-- | Utility/INotify.hs | 13 | ||||
-rw-r--r-- | Utility/Misc.hs | 28 | ||||
-rw-r--r-- | Utility/Rsync.hs | 28 | ||||
-rw-r--r-- | Utility/Types/DirWatcher.hs | 3 | ||||
-rw-r--r-- | doc/design/assistant/blog/day_87__more_progress_progress.mdwn | 28 | ||||
-rw-r--r-- | doc/design/assistant/progressbars.mdwn | 29 |
13 files changed, 187 insertions, 46 deletions
diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs index e31dfb40c..10ed7dd31 100644 --- a/Assistant/Threads/TransferPoller.hs +++ b/Assistant/Threads/TransferPoller.hs @@ -12,6 +12,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Logs.Transfer import Utility.NotificationBroadcaster +import qualified Assistant.Threads.TransferWatcher as TransferWatcher import Control.Concurrent import qualified Data.Map as M @@ -42,8 +43,20 @@ transferPollerThread st dstatus = thread $ do sz <- catchMaybeIO $ fromIntegral . fileSize <$> getFileStatus f - when (bytesComplete info /= sz && isJust sz) $ - alterTransferInfo dstatus t $ - \i -> i { bytesComplete = sz } - {- can't poll uploads -} + newsize t info sz + {- Uploads don't need to be polled for when the + - TransferWatcher thread can track file + - modifications. -} + | TransferWatcher.watchesTransferSize = noop + {- Otherwise, this code polls the upload progress + - by reading the transfer info file. -} + | otherwise = do + let f = transferFile t g + mi <- catchDefaultIO Nothing $ + readTransferInfoFile Nothing f + maybe noop (newsize t info . bytesComplete) mi + newsize t info sz + | bytesComplete info /= sz && isJust sz = + alterTransferInfo dstatus t $ + \i -> i { bytesComplete = sz } | otherwise = noop diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index ce0708a91..d4ff9176e 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -30,6 +30,7 @@ transferWatcherThread st dstatus transferqueue = thread $ do let hooks = mkWatchHooks { addHook = hook onAdd , delHook = hook onDel + , modifyHook = hook onModify , errHook = hook onErr } void $ watchDir dir (const False) hooks id @@ -71,6 +72,25 @@ onAdd st dstatus _ file _ = case parseTransferFile file of { transferRemote = r } sameuuid t r = Remote.uuid r == transferUUID t +{- Called when a transfer information file is updated. + - + - The only thing that should change in the transfer info is the + - bytesComplete, so that's the only thing updated in the DaemonStatus. -} +onModify :: Handler +onModify _ dstatus _ file _ = do + case parseTransferFile file of + Nothing -> noop + Just t -> go t =<< readTransferInfoFile Nothing file + where + go _ Nothing = noop + go t (Just newinfo) = alterTransferInfo dstatus t $ \info -> + info { bytesComplete = bytesComplete newinfo } + +{- This thread can only watch transfer sizes when the DirWatcher supports + - tracking modificatons to files. -} +watchesTransferSize :: Bool +watchesTransferSize = modifyTracked + {- Called when a transfer information file is removed. -} onDel :: Handler onDel st dstatus transferqueue file _ = case parseTransferFile file of diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index fa8b7b379..41396a23c 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -67,7 +67,7 @@ watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do where startup = startupScan st dstatus hook delay a = Just $ runHandler thisThread delay st dstatus transferqueue changechan a - hooks delayadd = WatchHooks + hooks delayadd = mkWatchHooks { addHook = hook (Seconds <$> delayadd) onAdd , delHook = hook Nothing onDel , addSymlinkHook = hook Nothing onAddSymlink diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs index d7f7a8d16..ac2606d8c 100644 --- a/Logs/Transfer.hs +++ b/Logs/Transfer.hs @@ -239,8 +239,8 @@ readTransferInfo mpid s = TransferInfo <*> pure (if null filename then Nothing else Just filename) <*> pure False where - (bits, filebits) = splitAt 1 $ lines s - filename = join "\n" filebits + (firstline, filename) = separate (== '\n') s + bits = split " " firstline numbits = length bits time = if numbits > 0 then Just <$> parsePOSIXTime (bits !! 0) diff --git a/Remote/Git.hs b/Remote/Git.hs index 80e73ede9..cd38cac06 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -37,6 +37,8 @@ import Init import Types.Key import qualified Fields +import Control.Concurrent + remote :: RemoteType remote = RemoteType { typename = "git", @@ -297,13 +299,27 @@ rsyncHelper callback params = do - filesystem. Then cp could be faster. -} rsyncOrCopyFile :: [CommandParam] -> FilePath -> FilePath -> ProgressCallback -> Annex Bool rsyncOrCopyFile rsyncparams src dest p = - ifM (sameDeviceIds src dest) - ( liftIO $ copyFileExternal src dest - , rsyncHelper (Just p) $ rsyncparams ++ [Param src, Param dest] - ) + ifM (sameDeviceIds src dest) (dorsync, docopy) where sameDeviceIds a b = (==) <$> (getDeviceId a) <*> (getDeviceId b) getDeviceId f = deviceID <$> liftIO (getFileStatus $ parentDir f) + dorsync = rsyncHelper (Just p) $ + rsyncparams ++ [Param src, Param dest] + docopy = liftIO $ bracket + (forkIO $ watchfilesize 0) + (void . tryIO . killThread) + (const $ copyFileExternal src dest) + watchfilesize oldsz = do + threadDelay 500000 -- 0.5 seconds + v <- catchMaybeIO $ + fromIntegral . fileSize + <$> getFileStatus dest + case v of + Just sz + | sz /= oldsz -> do + p sz + watchfilesize sz + _ -> watchfilesize oldsz {- Generates rsync parameters that ssh to the remote and asks it - to either receive or send the key's content. -} diff --git a/Remote/Rsync.hs b/Remote/Rsync.hs index aba427d4b..5384920fb 100644 --- a/Remote/Rsync.hs +++ b/Remote/Rsync.hs @@ -105,16 +105,16 @@ rsyncUrls o k = map use annexHashes f = keyFile k store :: RsyncOpts -> Key -> AssociatedFile -> ProgressCallback -> Annex Bool -store o k _f p = rsyncSend o k <=< inRepo $ gitAnnexLocation k +store o k _f p = rsyncSend o p k <=< inRepo $ gitAnnexLocation k storeEncrypted :: RsyncOpts -> (Cipher, Key) -> Key -> ProgressCallback -> Annex Bool storeEncrypted o (cipher, enck) k p = withTmp enck $ \tmp -> do src <- inRepo $ gitAnnexLocation k liftIO $ withEncryptedContent cipher (L.readFile src) $ L.writeFile tmp - rsyncSend o enck tmp + rsyncSend o p enck tmp retrieve :: RsyncOpts -> Key -> AssociatedFile -> FilePath -> Annex Bool -retrieve o k _ f = untilTrue (rsyncUrls o k) $ \u -> rsyncRemote o +retrieve o k _ f = untilTrue (rsyncUrls o k) $ \u -> rsyncRemote o Nothing -- use inplace when retrieving to support resuming [ Param "--inplace" , Param u @@ -191,10 +191,10 @@ withRsyncScratchDir a = do nuke d = liftIO $ whenM (doesDirectoryExist d) $ removeDirectoryRecursive d -rsyncRemote :: RsyncOpts -> [CommandParam] -> Annex Bool -rsyncRemote o params = do +rsyncRemote :: RsyncOpts -> (Maybe ProgressCallback) -> [CommandParam] -> Annex Bool +rsyncRemote o callback params = do showOutput -- make way for progress bar - ifM (liftIO $ rsync $ rsyncOptions o ++ defaultParams ++ params) + ifM (liftIO $ (maybe rsync rsyncProgress callback) ps) ( return True , do showLongNote "rsync failed -- run git annex again to resume file transfer" @@ -202,16 +202,17 @@ rsyncRemote o params = do ) where defaultParams = [Params "--progress"] + ps = rsyncOptions o ++ defaultParams ++ params {- To send a single key is slightly tricky; need to build up a temporary directory structure to pass to rsync so it can create the hash directories. -} -rsyncSend :: RsyncOpts -> Key -> FilePath -> Annex Bool -rsyncSend o k src = withRsyncScratchDir $ \tmp -> do +rsyncSend :: RsyncOpts -> ProgressCallback -> Key -> FilePath -> Annex Bool +rsyncSend o callback k src = withRsyncScratchDir $ \tmp -> do let dest = tmp </> Prelude.head (keyPaths k) liftIO $ createDirectoryIfMissing True $ parentDir dest liftIO $ createLink src dest - rsyncRemote o + rsyncRemote o (Just callback) [ Param "--recursive" , partialParams -- tmp/ to send contents of tmp dir diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs index 213aeb50a..e4ee83191 100644 --- a/Utility/DirWatcher.hs +++ b/Utility/DirWatcher.hs @@ -72,6 +72,20 @@ closingTracked = undefined #endif #endif +/* With inotify, modifications to existing files can be tracked. + * Kqueue does not support this. + */ +modifyTracked :: Bool +#if WITH_INOTIFY +modifyTracked = True +#else +#if WITH_KQUEUE +modifyTracked = False +#else +modifyTracked = undefined +#endif +#endif + /* Starts a watcher thread. The runStartup action is passed a scanner action * to run, that will return once the initial directory scan is complete. * Once runStartup returns, the watcher thread continues running, diff --git a/Utility/INotify.hs b/Utility/INotify.hs index 6af022819..7934c2446 100644 --- a/Utility/INotify.hs +++ b/Utility/INotify.hs @@ -38,9 +38,8 @@ import Control.Exception (throw) - Note: Moving a file will cause events deleting it from its old location - and adding it to the new location. - - - Note: Modification of files is not detected, and it's assumed that when - - a file that was open for write is closed, it's finished being written - - to, and can be added. + - Note: It's assumed that when a file that was open for write is closed, + - it's finished being written to, and can be added. - - Note: inotify has a limit to the number of watches allowed, - /proc/sys/fs/inotify/max_user_watches (default 8192). @@ -66,13 +65,16 @@ watchDir i dir ignored hooks -- Select only inotify events required by the enabled -- hooks, but always include Create so new directories can -- be scanned. - watchevents = Create : addevents ++ delevents + watchevents = Create : addevents ++ delevents ++ modifyevents addevents | hashook addHook || hashook addSymlinkHook = [MoveIn, CloseWrite] | otherwise = [] delevents | hashook delHook || hashook delDirHook = [MoveOut, Delete] | otherwise = [] + modifyevents + | hashook modifyHook = [Modify] + | otherwise = [] scan f = unless (ignored f) $ do ms <- getstatus f @@ -114,6 +116,9 @@ watchDir i dir ignored hooks | otherwise = guarded $ runhook delHook f Nothing where guarded = unlessM (filetype (const True) f) + go (Modified { isDirectory = isd, maybeFilePath = Just f }) + | isd = noop + | otherwise = runhook modifyHook f Nothing go _ = noop hashook h = isJust $ h hooks diff --git a/Utility/Misc.hs b/Utility/Misc.hs index 77ebb4f3d..349b20efe 100644 --- a/Utility/Misc.hs +++ b/Utility/Misc.hs @@ -9,6 +9,9 @@ module Utility.Misc where import System.IO import Control.Monad +import Foreign +import Data.Char +import Control.Applicative {- A version of hgetContents that is not lazy. Ensures file is - all read before it gets closed. -} @@ -47,8 +50,31 @@ segment p l = map reverse $ go [] [] l | otherwise = go (i:c) r is {- Given two orderings, returns the second if the first is EQ and returns - - the first otherwise. -} + - the first otherwise. + - + - Example use: + - + - compare lname1 lname2 `thenOrd` compare fname1 fname2 + -} thenOrd :: Ordering -> Ordering -> Ordering thenOrd EQ x = x thenOrd x _ = x {-# INLINE thenOrd #-} + +{- Wrapper around hGetBufSome that returns a String. + - + - The null string is returned on eof, otherwise returns whatever + - data is currently available to read from the handle, or waits for + - data to be written to it if none is currently available. + - + - Note on encodings: The normal encoding of the Handle is ignored; + - each byte is converted to a Char. Not unicode clean! + -} +hGetSomeString :: Handle -> Int -> IO String +hGetSomeString h sz = do + fp <- mallocForeignPtrBytes sz + len <- withForeignPtr fp $ \buf -> hGetBufSome h buf sz + map (chr . fromIntegral) <$> withForeignPtr fp (peekbytes len) + where + peekbytes :: Int -> Ptr Word8 -> IO [Word8] + peekbytes len buf = mapM (peekElemOff buf) [0..pred len] diff --git a/Utility/Rsync.hs b/Utility/Rsync.hs index 0fe4a8986..08caeb12b 100644 --- a/Utility/Rsync.hs +++ b/Utility/Rsync.hs @@ -48,21 +48,29 @@ rsync = boolSystem "rsync" {- Runs rsync, but intercepts its progress output and feeds bytes - complete values into the callback. The progress output is also output - - to stdout. -} + - to stdout. + - + - The params must enable rsync's --progress mode for this to work. + -} rsyncProgress :: (Integer -> IO ()) -> [CommandParam] -> IO Bool rsyncProgress callback params = catchBoolIO $ - withHandle StdoutHandle createProcessSuccess p (feedprogress []) + withHandle StdoutHandle createProcessSuccess p (feedprogress 0 []) where p = proc "rsync" (toCommand params) - feedprogress buf h = - catchMaybeIO (hGetChar h) >>= \v -> case v of - Just c -> do - putChar c + feedprogress prev buf h = do + s <- hGetSomeString h 80 + if null s + then return True + else do + putStr s hFlush stdout - let (mbytes, buf') = parseRsyncProgress (buf++[c]) - maybe noop callback mbytes - feedprogress buf' h - Nothing -> return True + let (mbytes, buf') = parseRsyncProgress (buf++s) + case mbytes of + Nothing -> feedprogress prev buf' h + (Just bytes) -> do + when (bytes /= prev) $ + callback bytes + feedprogress bytes buf' h {- Checks if an rsync url involves the remote shell (ssh or rsh). - Use of such urls with rsync requires additional shell diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs index ba7eae6a1..30ada9c68 100644 --- a/Utility/Types/DirWatcher.hs +++ b/Utility/Types/DirWatcher.hs @@ -19,7 +19,8 @@ data WatchHooks = WatchHooks , delHook :: Hook FilePath , delDirHook :: Hook FilePath , errHook :: Hook String -- error message + , modifyHook :: Hook FilePath } mkWatchHooks :: WatchHooks -mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing +mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing Nothing diff --git a/doc/design/assistant/blog/day_87__more_progress_progress.mdwn b/doc/design/assistant/blog/day_87__more_progress_progress.mdwn new file mode 100644 index 000000000..c2b266d6d --- /dev/null +++ b/doc/design/assistant/blog/day_87__more_progress_progress.mdwn @@ -0,0 +1,28 @@ +Worked more on upload progress tracking. I'm fairly happy with its state +now: + +* It's fully implemented for rsync special remotes. + +* Git remotes also fully support it, with the + notable exception of file uploads run by `git-annex-shell recvkey`. That + runs `rsync --server --sender`, and in that mode, rsync refuses to output + progress info. Not sure what to do about this case. Maybe I should + write a parser for the rsync wire protocol that can tell what chunk of the + file is being sent, and shim it in front of the rsync server? That's + rather hardcore, but it seems the best of a bad grab bag of options that + include things like `LD_PRELOAD` hacks. + +* Also optimised the rsync progress bar reader to read whole + chunks of data rather than one byte at a time. + +* Also got progress bars to actually update in the webapp for uploads. + + This turned out to be tricky because kqueue cannot be used to detect when + existing files have been modified. (One of kqueue's worst shortcomings vs + inotify.) Currently on kqueue systems it has to poll. + +I will probably upload add progress tracking to the directory special remote, +which should be very easy (it already implements its own progress bars), +and leave the other special remotes for later. I can add upload progress +tracking to each special remote when I add support for configuring it in +the webapp. diff --git a/doc/design/assistant/progressbars.mdwn b/doc/design/assistant/progressbars.mdwn index ade9a8370..d3764e8dc 100644 --- a/doc/design/assistant/progressbars.mdwn +++ b/doc/design/assistant/progressbars.mdwn @@ -16,13 +16,22 @@ This is one of those potentially hidden but time consuming problems. ## uploads -Options: - -* Feed rsync output into a parser and parse out a progress value. Ugly, - failure prone, but potentially the least CPU-expensive option. -* Use librsync. Note: It's not wire-compatiable with the actual rsync - command. -* Set up a FIFO, have rsync read from or write to that, and the FIFO - feeder/reader then can update the transfer info. Generic enough to - work for most (all?) special remotes, but also the most expensive option, - involving another copy through memory of the whole file contents. +Each individual remote type needs to implement its own support for calling +the ProgressCallback as the upload progresses. + +* git: Done, with one exception: `git-annex-shell sendkey` runs `rsync + --server --sender` and in that mode it does not report progress info. + So downloads initiated by other repos do not show progress in the repo + doing the uploading. + + Maybe I should + write a proxy for the rsync wire protocol that can tell what chunk of the + file is being sent, and shim it in front of the rsync server? + +* rsync: **done** +* directory +* web: Not applicable; does not upload +* S3 +* bup +* hook: Would require the hook interface to somehow do this, which seems + too complicated. So skipping. |