aboutsummaryrefslogtreecommitdiff
path: root/Messages
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2016-09-09 15:49:44 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2016-09-09 15:51:34 -0400
commitd1f722e3b0b74a9d9a2e35ac1f47dbca9f7cf606 (patch)
tree74314a3747d4a0456135bf38b2e1115b5a3f64b9 /Messages
parent2408f5c6084aa04a09b36edcd264ce6bc7177c93 (diff)
better locking for json with -J
Avoid threads emitting json at the same time and scrambling, which was still possible even with the buffering, just less likely. Converted json IO actions to JSONChunk data too.
Diffstat (limited to 'Messages')
-rw-r--r--Messages/Internal.hs26
-rw-r--r--Messages/JSON.hs54
-rw-r--r--Messages/Progress.hs5
3 files changed, 53 insertions, 32 deletions
diff --git a/Messages/Internal.hs b/Messages/Internal.hs
index bf212b71b..2c9a461a5 100644
--- a/Messages/Internal.hs
+++ b/Messages/Internal.hs
@@ -11,17 +11,20 @@ import Common
import Annex
import Types.Messages
import Messages.Concurrent
+import Messages.JSON
+
+import qualified Data.ByteString.Lazy as B
withMessageState :: (MessageState -> Annex a) -> Annex a
withMessageState a = Annex.getState Annex.output >>= a
-outputMessage :: IO () -> String -> Annex ()
+outputMessage :: JSONChunk -> String -> Annex ()
outputMessage = outputMessage' False
-outputMessageFinal :: IO () -> String -> Annex ()
+outputMessageFinal :: JSONChunk -> String -> Annex ()
outputMessageFinal = outputMessage' True
-outputMessage' :: Bool -> IO () -> String -> Annex ()
+outputMessage' :: Bool -> JSONChunk -> String -> Annex ()
outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s of
NormalOutput
| concurrentOutputEnabled s -> concurrentMessage s False msg q
@@ -29,7 +32,7 @@ outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s
JSONOutput _ -> void $ outputJSON json endmessage s
QuietOutput -> q
-outputJSON :: IO () -> Bool -> MessageState -> Annex Bool
+outputJSON :: JSONChunk -> Bool -> MessageState -> Annex Bool
outputJSON json endmessage s = case outputType s of
JSONOutput withprogress
| withprogress || concurrentOutputEnabled s -> do
@@ -37,20 +40,17 @@ outputJSON json endmessage s = case outputType s of
if endmessage
then do
Annex.changeState $ \st ->
- st { Annex.output = s { jsonBuffer = [] } }
- liftIO $ flushed $ do
- showJSONBuffer s
- json
+ st { Annex.output = s { jsonBuffer = none } }
+ liftIO $ flushed $ emit b
else Annex.changeState $ \st ->
- st { Annex.output = s { jsonBuffer = json : jsonBuffer s } }
+ st { Annex.output = s { jsonBuffer = b } }
return True
| otherwise -> do
- liftIO $ flushed json
+ liftIO $ flushed $ emit json
return True
_ -> return False
-
-showJSONBuffer :: MessageState -> IO ()
-showJSONBuffer s = sequence_ $ reverse $ jsonBuffer s
+ where
+ b = jsonBuffer s `B.append` json
outputError :: String -> Annex ()
outputError msg = withMessageState $ \s ->
diff --git a/Messages/JSON.hs b/Messages/JSON.hs
index 7b94aa220..3baeaef3f 100644
--- a/Messages/JSON.hs
+++ b/Messages/JSON.hs
@@ -8,6 +8,9 @@
{-# LANGUAGE OverloadedStrings #-}
module Messages.JSON (
+ JSONChunk,
+ emit,
+ none,
start,
end,
note,
@@ -25,6 +28,8 @@ import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as B
import System.IO
+import System.IO.Unsafe (unsafePerformIO)
+import Control.Concurrent
import Data.Maybe
import Data.Monoid
import Prelude
@@ -34,8 +39,24 @@ import Types.Key
import Utility.Metered
import Utility.Percentage
-start :: String -> Maybe FilePath -> Maybe Key -> IO ()
-start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o
+type JSONChunk = B.ByteString
+
+-- A global lock to avoid concurrent threads emitting json at the same time.
+{-# NOINLINE emitLock #-}
+emitLock :: MVar ()
+emitLock = unsafePerformIO $ newMVar ()
+
+emit :: JSONChunk -> IO ()
+emit v = do
+ takeMVar emitLock
+ B.hPut stdout v
+ putMVar emitLock ()
+
+none :: JSONChunk
+none = B.empty
+
+start :: String -> Maybe FilePath -> Maybe Key -> JSONChunk
+start command file key = Stream.start $ Stream.AesonObject o
where
Object o = toJSON $ JSONActionItem
{ itemCommand = Just command
@@ -44,25 +65,26 @@ start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o
, itemAdded = Nothing
}
-end :: Bool -> IO ()
-end b = B.hPut stdout $ Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end
+end :: Bool -> JSONChunk
+end b =Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end
-note :: String -> IO ()
+note :: String -> JSONChunk
note s = add (Stream.JSONChunk [("note", s)])
-add :: Stream.JSONChunk a -> IO ()
-add = B.hPut stdout . Stream.add
+add :: Stream.JSONChunk a -> JSONChunk
+add = Stream.add
-complete :: Stream.JSONChunk a -> IO ()
-complete v = B.hPut stdout $ Stream.start v `B.append` Stream.end
+complete :: Stream.JSONChunk a -> JSONChunk
+complete v = Stream.start v `B.append` Stream.end
-progress :: IO () -> Integer -> BytesProcessed -> IO ()
-progress jsonbuffer size bytesprocessed = do
- B.hPut stdout $ Stream.start $ Stream.AesonObject o
- putStr ",\"action\":"
- jsonbuffer
- B.hPut stdout $ Stream.end
- B.hPut stdout $ Stream.end
+progress :: B.ByteString -> Integer -> BytesProcessed -> IO ()
+progress jsonbuffer size bytesprocessed = emit $ B.concat
+ [ Stream.start $ Stream.AesonObject o
+ , ",\"action\":"
+ , jsonbuffer
+ , "}"
+ , Stream.end
+ ]
where
n = fromBytesProcessed bytesprocessed :: Integer
Object o = object
diff --git a/Messages/Progress.hs b/Messages/Progress.hs
index a48e7b07e..39709a211 100644
--- a/Messages/Progress.hs
+++ b/Messages/Progress.hs
@@ -11,7 +11,6 @@ module Messages.Progress where
import Common
import Messages
-import Messages.Internal
import Utility.Metered
import Types
import Types.Messages
@@ -59,7 +58,7 @@ metered othermeter key a = case keySize key of
#endif
go _ (MessageState { outputType = JSONOutput False }) = nometer
go size (MessageState { outputType = JSONOutput True }) = do
- buf <- withMessageState $ return . showJSONBuffer
+ buf <- withMessageState $ return . jsonBuffer
m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $
JSON.progress buf size
a (combinemeter m)
@@ -93,7 +92,7 @@ concurrentMeteredFile file combinemeterupdate key a =
{- Progress dots. -}
showProgressDots :: Annex ()
-showProgressDots = outputMessage q "."
+showProgressDots = outputMessage JSON.none "."
{- Runs a command, that may output progress to either stdout or
- stderr, as well as other messages.