diff options
Diffstat (limited to 'Assistant/Types')
-rw-r--r-- | Assistant/Types/NetMessager.hs | 49 |
1 files changed, 36 insertions, 13 deletions
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 091d12815..c036d624a 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -14,7 +14,7 @@ import Data.Text (Text) import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) -import Data.Set as S +import qualified Data.Set as S {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -47,32 +47,55 @@ data PushStage | ReceivePackDone ExitCode deriving (Show, Eq, Ord) -data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID - deriving (Eq) - +{- Things that initiate either side of a push, but do not actually send data. -} isPushInitiation :: PushStage -> Bool isPushInitiation CanPush = True isPushInitiation PushRequest = True isPushInitiation StartingPush = True isPushInitiation _ = False +data PushSide = SendPack | ReceivePack + deriving (Eq, Ord) + +pushDestinationSide :: PushStage -> PushSide +pushDestinationSide CanPush = ReceivePack +pushDestinationSide PushRequest = SendPack +pushDestinationSide StartingPush = ReceivePack +pushDestinationSide (ReceivePackOutput _) = SendPack +pushDestinationSide (SendPackOutput _) = ReceivePack +pushDestinationSide (ReceivePackDone _) = SendPack + +type SideMap a = PushSide -> a + +mkSideMap :: STM a -> IO (SideMap a) +mkSideMap gen = do + (sp, rp) <- atomically $ (,) <$> gen <*> gen + return $ lookupside sp rp + where + lookupside sp _ SendPack = sp + lookupside _ rp ReceivePack = rp + +getSide :: PushSide -> SideMap a -> a +getSide side m = m side + data NetMessager = NetMessager -- outgoing messages { netMessages :: TChan (NetMessage) - -- only one push can be running at a time, and this tracks it - , netMessagerPushRunning :: TMVar (PushRunning) - -- incoming messages relating to the currently running push - , netMessagesPush :: TChan (NetMessage) - -- incoming push messages that have been deferred to be processed later - , netMessagesDeferredPush :: TMVar (S.Set NetMessage) -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () + -- only one side of a push can be running at a time + , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID)) + -- incoming messages related to a running push + , netMessagesPush :: SideMap (TChan NetMessage) + -- incoming push messages, deferred to be processed later + , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage)) } newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan - <*> atomically (newTMVar NoPushRunning) - <*> atomically newTChan - <*> atomically (newTMVar S.empty) <*> newEmptySV + <*> mkSideMap (newTMVar Nothing) + <*> mkSideMap newTChan + <*> mkSideMap (newTMVar S.empty) + where |