summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/Threads/TransferPoller.hs21
-rw-r--r--Assistant/Threads/TransferWatcher.hs20
-rw-r--r--Assistant/Threads/Watcher.hs2
-rw-r--r--Logs/Transfer.hs4
-rw-r--r--Remote/Git.hs24
-rw-r--r--Remote/Rsync.hs19
-rw-r--r--Utility/DirWatcher.hs14
-rw-r--r--Utility/INotify.hs13
-rw-r--r--Utility/Misc.hs28
-rw-r--r--Utility/Rsync.hs28
-rw-r--r--Utility/Types/DirWatcher.hs3
-rw-r--r--doc/design/assistant/blog/day_87__more_progress_progress.mdwn28
-rw-r--r--doc/design/assistant/progressbars.mdwn29
13 files changed, 187 insertions, 46 deletions
diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs
index e31dfb40c..10ed7dd31 100644
--- a/Assistant/Threads/TransferPoller.hs
+++ b/Assistant/Threads/TransferPoller.hs
@@ -12,6 +12,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Logs.Transfer
import Utility.NotificationBroadcaster
+import qualified Assistant.Threads.TransferWatcher as TransferWatcher
import Control.Concurrent
import qualified Data.Map as M
@@ -42,8 +43,20 @@ transferPollerThread st dstatus = thread $ do
sz <- catchMaybeIO $
fromIntegral . fileSize
<$> getFileStatus f
- when (bytesComplete info /= sz && isJust sz) $
- alterTransferInfo dstatus t $
- \i -> i { bytesComplete = sz }
- {- can't poll uploads -}
+ newsize t info sz
+ {- Uploads don't need to be polled for when the
+ - TransferWatcher thread can track file
+ - modifications. -}
+ | TransferWatcher.watchesTransferSize = noop
+ {- Otherwise, this code polls the upload progress
+ - by reading the transfer info file. -}
+ | otherwise = do
+ let f = transferFile t g
+ mi <- catchDefaultIO Nothing $
+ readTransferInfoFile Nothing f
+ maybe noop (newsize t info . bytesComplete) mi
+ newsize t info sz
+ | bytesComplete info /= sz && isJust sz =
+ alterTransferInfo dstatus t $
+ \i -> i { bytesComplete = sz }
| otherwise = noop
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
index ce0708a91..d4ff9176e 100644
--- a/Assistant/Threads/TransferWatcher.hs
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -30,6 +30,7 @@ transferWatcherThread st dstatus transferqueue = thread $ do
let hooks = mkWatchHooks
{ addHook = hook onAdd
, delHook = hook onDel
+ , modifyHook = hook onModify
, errHook = hook onErr
}
void $ watchDir dir (const False) hooks id
@@ -71,6 +72,25 @@ onAdd st dstatus _ file _ = case parseTransferFile file of
{ transferRemote = r }
sameuuid t r = Remote.uuid r == transferUUID t
+{- Called when a transfer information file is updated.
+ -
+ - The only thing that should change in the transfer info is the
+ - bytesComplete, so that's the only thing updated in the DaemonStatus. -}
+onModify :: Handler
+onModify _ dstatus _ file _ = do
+ case parseTransferFile file of
+ Nothing -> noop
+ Just t -> go t =<< readTransferInfoFile Nothing file
+ where
+ go _ Nothing = noop
+ go t (Just newinfo) = alterTransferInfo dstatus t $ \info ->
+ info { bytesComplete = bytesComplete newinfo }
+
+{- This thread can only watch transfer sizes when the DirWatcher supports
+ - tracking modificatons to files. -}
+watchesTransferSize :: Bool
+watchesTransferSize = modifyTracked
+
{- Called when a transfer information file is removed. -}
onDel :: Handler
onDel st dstatus transferqueue file _ = case parseTransferFile file of
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index fa8b7b379..41396a23c 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -67,7 +67,7 @@ watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do
where
startup = startupScan st dstatus
hook delay a = Just $ runHandler thisThread delay st dstatus transferqueue changechan a
- hooks delayadd = WatchHooks
+ hooks delayadd = mkWatchHooks
{ addHook = hook (Seconds <$> delayadd) onAdd
, delHook = hook Nothing onDel
, addSymlinkHook = hook Nothing onAddSymlink
diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs
index d7f7a8d16..ac2606d8c 100644
--- a/Logs/Transfer.hs
+++ b/Logs/Transfer.hs
@@ -239,8 +239,8 @@ readTransferInfo mpid s = TransferInfo
<*> pure (if null filename then Nothing else Just filename)
<*> pure False
where
- (bits, filebits) = splitAt 1 $ lines s
- filename = join "\n" filebits
+ (firstline, filename) = separate (== '\n') s
+ bits = split " " firstline
numbits = length bits
time = if numbits > 0
then Just <$> parsePOSIXTime (bits !! 0)
diff --git a/Remote/Git.hs b/Remote/Git.hs
index 80e73ede9..cd38cac06 100644
--- a/Remote/Git.hs
+++ b/Remote/Git.hs
@@ -37,6 +37,8 @@ import Init
import Types.Key
import qualified Fields
+import Control.Concurrent
+
remote :: RemoteType
remote = RemoteType {
typename = "git",
@@ -297,13 +299,27 @@ rsyncHelper callback params = do
- filesystem. Then cp could be faster. -}
rsyncOrCopyFile :: [CommandParam] -> FilePath -> FilePath -> ProgressCallback -> Annex Bool
rsyncOrCopyFile rsyncparams src dest p =
- ifM (sameDeviceIds src dest)
- ( liftIO $ copyFileExternal src dest
- , rsyncHelper (Just p) $ rsyncparams ++ [Param src, Param dest]
- )
+ ifM (sameDeviceIds src dest) (dorsync, docopy)
where
sameDeviceIds a b = (==) <$> (getDeviceId a) <*> (getDeviceId b)
getDeviceId f = deviceID <$> liftIO (getFileStatus $ parentDir f)
+ dorsync = rsyncHelper (Just p) $
+ rsyncparams ++ [Param src, Param dest]
+ docopy = liftIO $ bracket
+ (forkIO $ watchfilesize 0)
+ (void . tryIO . killThread)
+ (const $ copyFileExternal src dest)
+ watchfilesize oldsz = do
+ threadDelay 500000 -- 0.5 seconds
+ v <- catchMaybeIO $
+ fromIntegral . fileSize
+ <$> getFileStatus dest
+ case v of
+ Just sz
+ | sz /= oldsz -> do
+ p sz
+ watchfilesize sz
+ _ -> watchfilesize oldsz
{- Generates rsync parameters that ssh to the remote and asks it
- to either receive or send the key's content. -}
diff --git a/Remote/Rsync.hs b/Remote/Rsync.hs
index aba427d4b..5384920fb 100644
--- a/Remote/Rsync.hs
+++ b/Remote/Rsync.hs
@@ -105,16 +105,16 @@ rsyncUrls o k = map use annexHashes
f = keyFile k
store :: RsyncOpts -> Key -> AssociatedFile -> ProgressCallback -> Annex Bool
-store o k _f p = rsyncSend o k <=< inRepo $ gitAnnexLocation k
+store o k _f p = rsyncSend o p k <=< inRepo $ gitAnnexLocation k
storeEncrypted :: RsyncOpts -> (Cipher, Key) -> Key -> ProgressCallback -> Annex Bool
storeEncrypted o (cipher, enck) k p = withTmp enck $ \tmp -> do
src <- inRepo $ gitAnnexLocation k
liftIO $ withEncryptedContent cipher (L.readFile src) $ L.writeFile tmp
- rsyncSend o enck tmp
+ rsyncSend o p enck tmp
retrieve :: RsyncOpts -> Key -> AssociatedFile -> FilePath -> Annex Bool
-retrieve o k _ f = untilTrue (rsyncUrls o k) $ \u -> rsyncRemote o
+retrieve o k _ f = untilTrue (rsyncUrls o k) $ \u -> rsyncRemote o Nothing
-- use inplace when retrieving to support resuming
[ Param "--inplace"
, Param u
@@ -191,10 +191,10 @@ withRsyncScratchDir a = do
nuke d = liftIO $ whenM (doesDirectoryExist d) $
removeDirectoryRecursive d
-rsyncRemote :: RsyncOpts -> [CommandParam] -> Annex Bool
-rsyncRemote o params = do
+rsyncRemote :: RsyncOpts -> (Maybe ProgressCallback) -> [CommandParam] -> Annex Bool
+rsyncRemote o callback params = do
showOutput -- make way for progress bar
- ifM (liftIO $ rsync $ rsyncOptions o ++ defaultParams ++ params)
+ ifM (liftIO $ (maybe rsync rsyncProgress callback) ps)
( return True
, do
showLongNote "rsync failed -- run git annex again to resume file transfer"
@@ -202,16 +202,17 @@ rsyncRemote o params = do
)
where
defaultParams = [Params "--progress"]
+ ps = rsyncOptions o ++ defaultParams ++ params
{- To send a single key is slightly tricky; need to build up a temporary
directory structure to pass to rsync so it can create the hash
directories. -}
-rsyncSend :: RsyncOpts -> Key -> FilePath -> Annex Bool
-rsyncSend o k src = withRsyncScratchDir $ \tmp -> do
+rsyncSend :: RsyncOpts -> ProgressCallback -> Key -> FilePath -> Annex Bool
+rsyncSend o callback k src = withRsyncScratchDir $ \tmp -> do
let dest = tmp </> Prelude.head (keyPaths k)
liftIO $ createDirectoryIfMissing True $ parentDir dest
liftIO $ createLink src dest
- rsyncRemote o
+ rsyncRemote o (Just callback)
[ Param "--recursive"
, partialParams
-- tmp/ to send contents of tmp dir
diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs
index 213aeb50a..e4ee83191 100644
--- a/Utility/DirWatcher.hs
+++ b/Utility/DirWatcher.hs
@@ -72,6 +72,20 @@ closingTracked = undefined
#endif
#endif
+/* With inotify, modifications to existing files can be tracked.
+ * Kqueue does not support this.
+ */
+modifyTracked :: Bool
+#if WITH_INOTIFY
+modifyTracked = True
+#else
+#if WITH_KQUEUE
+modifyTracked = False
+#else
+modifyTracked = undefined
+#endif
+#endif
+
/* Starts a watcher thread. The runStartup action is passed a scanner action
* to run, that will return once the initial directory scan is complete.
* Once runStartup returns, the watcher thread continues running,
diff --git a/Utility/INotify.hs b/Utility/INotify.hs
index 6af022819..7934c2446 100644
--- a/Utility/INotify.hs
+++ b/Utility/INotify.hs
@@ -38,9 +38,8 @@ import Control.Exception (throw)
- Note: Moving a file will cause events deleting it from its old location
- and adding it to the new location.
-
- - Note: Modification of files is not detected, and it's assumed that when
- - a file that was open for write is closed, it's finished being written
- - to, and can be added.
+ - Note: It's assumed that when a file that was open for write is closed,
+ - it's finished being written to, and can be added.
-
- Note: inotify has a limit to the number of watches allowed,
- /proc/sys/fs/inotify/max_user_watches (default 8192).
@@ -66,13 +65,16 @@ watchDir i dir ignored hooks
-- Select only inotify events required by the enabled
-- hooks, but always include Create so new directories can
-- be scanned.
- watchevents = Create : addevents ++ delevents
+ watchevents = Create : addevents ++ delevents ++ modifyevents
addevents
| hashook addHook || hashook addSymlinkHook = [MoveIn, CloseWrite]
| otherwise = []
delevents
| hashook delHook || hashook delDirHook = [MoveOut, Delete]
| otherwise = []
+ modifyevents
+ | hashook modifyHook = [Modify]
+ | otherwise = []
scan f = unless (ignored f) $ do
ms <- getstatus f
@@ -114,6 +116,9 @@ watchDir i dir ignored hooks
| otherwise = guarded $ runhook delHook f Nothing
where
guarded = unlessM (filetype (const True) f)
+ go (Modified { isDirectory = isd, maybeFilePath = Just f })
+ | isd = noop
+ | otherwise = runhook modifyHook f Nothing
go _ = noop
hashook h = isJust $ h hooks
diff --git a/Utility/Misc.hs b/Utility/Misc.hs
index 77ebb4f3d..349b20efe 100644
--- a/Utility/Misc.hs
+++ b/Utility/Misc.hs
@@ -9,6 +9,9 @@ module Utility.Misc where
import System.IO
import Control.Monad
+import Foreign
+import Data.Char
+import Control.Applicative
{- A version of hgetContents that is not lazy. Ensures file is
- all read before it gets closed. -}
@@ -47,8 +50,31 @@ segment p l = map reverse $ go [] [] l
| otherwise = go (i:c) r is
{- Given two orderings, returns the second if the first is EQ and returns
- - the first otherwise. -}
+ - the first otherwise.
+ -
+ - Example use:
+ -
+ - compare lname1 lname2 `thenOrd` compare fname1 fname2
+ -}
thenOrd :: Ordering -> Ordering -> Ordering
thenOrd EQ x = x
thenOrd x _ = x
{-# INLINE thenOrd #-}
+
+{- Wrapper around hGetBufSome that returns a String.
+ -
+ - The null string is returned on eof, otherwise returns whatever
+ - data is currently available to read from the handle, or waits for
+ - data to be written to it if none is currently available.
+ -
+ - Note on encodings: The normal encoding of the Handle is ignored;
+ - each byte is converted to a Char. Not unicode clean!
+ -}
+hGetSomeString :: Handle -> Int -> IO String
+hGetSomeString h sz = do
+ fp <- mallocForeignPtrBytes sz
+ len <- withForeignPtr fp $ \buf -> hGetBufSome h buf sz
+ map (chr . fromIntegral) <$> withForeignPtr fp (peekbytes len)
+ where
+ peekbytes :: Int -> Ptr Word8 -> IO [Word8]
+ peekbytes len buf = mapM (peekElemOff buf) [0..pred len]
diff --git a/Utility/Rsync.hs b/Utility/Rsync.hs
index 0fe4a8986..08caeb12b 100644
--- a/Utility/Rsync.hs
+++ b/Utility/Rsync.hs
@@ -48,21 +48,29 @@ rsync = boolSystem "rsync"
{- Runs rsync, but intercepts its progress output and feeds bytes
- complete values into the callback. The progress output is also output
- - to stdout. -}
+ - to stdout.
+ -
+ - The params must enable rsync's --progress mode for this to work.
+ -}
rsyncProgress :: (Integer -> IO ()) -> [CommandParam] -> IO Bool
rsyncProgress callback params = catchBoolIO $
- withHandle StdoutHandle createProcessSuccess p (feedprogress [])
+ withHandle StdoutHandle createProcessSuccess p (feedprogress 0 [])
where
p = proc "rsync" (toCommand params)
- feedprogress buf h =
- catchMaybeIO (hGetChar h) >>= \v -> case v of
- Just c -> do
- putChar c
+ feedprogress prev buf h = do
+ s <- hGetSomeString h 80
+ if null s
+ then return True
+ else do
+ putStr s
hFlush stdout
- let (mbytes, buf') = parseRsyncProgress (buf++[c])
- maybe noop callback mbytes
- feedprogress buf' h
- Nothing -> return True
+ let (mbytes, buf') = parseRsyncProgress (buf++s)
+ case mbytes of
+ Nothing -> feedprogress prev buf' h
+ (Just bytes) -> do
+ when (bytes /= prev) $
+ callback bytes
+ feedprogress bytes buf' h
{- Checks if an rsync url involves the remote shell (ssh or rsh).
- Use of such urls with rsync requires additional shell
diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs
index ba7eae6a1..30ada9c68 100644
--- a/Utility/Types/DirWatcher.hs
+++ b/Utility/Types/DirWatcher.hs
@@ -19,7 +19,8 @@ data WatchHooks = WatchHooks
, delHook :: Hook FilePath
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
+ , modifyHook :: Hook FilePath
}
mkWatchHooks :: WatchHooks
-mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing
+mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing Nothing
diff --git a/doc/design/assistant/blog/day_87__more_progress_progress.mdwn b/doc/design/assistant/blog/day_87__more_progress_progress.mdwn
new file mode 100644
index 000000000..c2b266d6d
--- /dev/null
+++ b/doc/design/assistant/blog/day_87__more_progress_progress.mdwn
@@ -0,0 +1,28 @@
+Worked more on upload progress tracking. I'm fairly happy with its state
+now:
+
+* It's fully implemented for rsync special remotes.
+
+* Git remotes also fully support it, with the
+ notable exception of file uploads run by `git-annex-shell recvkey`. That
+ runs `rsync --server --sender`, and in that mode, rsync refuses to output
+ progress info. Not sure what to do about this case. Maybe I should
+ write a parser for the rsync wire protocol that can tell what chunk of the
+ file is being sent, and shim it in front of the rsync server? That's
+ rather hardcore, but it seems the best of a bad grab bag of options that
+ include things like `LD_PRELOAD` hacks.
+
+* Also optimised the rsync progress bar reader to read whole
+ chunks of data rather than one byte at a time.
+
+* Also got progress bars to actually update in the webapp for uploads.
+
+ This turned out to be tricky because kqueue cannot be used to detect when
+ existing files have been modified. (One of kqueue's worst shortcomings vs
+ inotify.) Currently on kqueue systems it has to poll.
+
+I will probably upload add progress tracking to the directory special remote,
+which should be very easy (it already implements its own progress bars),
+and leave the other special remotes for later. I can add upload progress
+tracking to each special remote when I add support for configuring it in
+the webapp.
diff --git a/doc/design/assistant/progressbars.mdwn b/doc/design/assistant/progressbars.mdwn
index ade9a8370..d3764e8dc 100644
--- a/doc/design/assistant/progressbars.mdwn
+++ b/doc/design/assistant/progressbars.mdwn
@@ -16,13 +16,22 @@ This is one of those potentially hidden but time consuming problems.
## uploads
-Options:
-
-* Feed rsync output into a parser and parse out a progress value. Ugly,
- failure prone, but potentially the least CPU-expensive option.
-* Use librsync. Note: It's not wire-compatiable with the actual rsync
- command.
-* Set up a FIFO, have rsync read from or write to that, and the FIFO
- feeder/reader then can update the transfer info. Generic enough to
- work for most (all?) special remotes, but also the most expensive option,
- involving another copy through memory of the whole file contents.
+Each individual remote type needs to implement its own support for calling
+the ProgressCallback as the upload progresses.
+
+* git: Done, with one exception: `git-annex-shell sendkey` runs `rsync
+ --server --sender` and in that mode it does not report progress info.
+ So downloads initiated by other repos do not show progress in the repo
+ doing the uploading.
+
+ Maybe I should
+ write a proxy for the rsync wire protocol that can tell what chunk of the
+ file is being sent, and shim it in front of the rsync server?
+
+* rsync: **done**
+* directory
+* web: Not applicable; does not upload
+* S3
+* bup
+* hook: Would require the hook interface to somehow do this, which seems
+ too complicated. So skipping.