summaryrefslogtreecommitdiff
path: root/Assistant/Types/NetMessager.hs
blob: 525ff29f20464970bad1ad00c722d3e90f36965a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
{- git-annex assistant out of band network messager types
 -
 - Copyright 2012 Joey Hess <joey@kitenet.net>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

module Assistant.Types.NetMessager where

import Common.Annex
import Assistant.Pairing
import Git.Types

import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B8
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.DList as D

{- Messages that can be sent out of band by a network messager. -}
data NetMessage 
	-- indicate that pushes have been made to the repos with these uuids
	= NotifyPush [UUID]
	-- requests other clients to inform us of their presence
	| QueryPresence
	-- notification about a stage in the pairing process,
	-- involving a client, and a UUID.
	| PairingNotification PairStage ClientID UUID
	-- used for git push over the network messager
	| Pushing ClientID PushStage
	deriving (Show, Eq, Ord)

{- Something used to identify the client, or clients to send the message to. -}
type ClientID = Text

data PushStage
	-- indicates that we have data to push over the out of band network
	= CanPush UUID [Sha]
	-- request that a git push be sent over the out of band network
	| PushRequest UUID
	-- indicates that a push is starting
	| StartingPush UUID
	-- a chunk of output of git receive-pack
	| ReceivePackOutput SequenceNum ByteString
	-- a chuck of output of git send-pack
	| SendPackOutput SequenceNum ByteString
	-- sent when git receive-pack exits, with its exit code
	| ReceivePackDone ExitCode
	deriving (Show, Eq, Ord)

{- A sequence number. Incremented by one per packet in a sequence,
 - starting with 1 for the first packet. 0 means sequence numbers are
 - not being used. -}
type SequenceNum = Int

{- NetMessages that are important (and small), and should be stored to be
 - resent when new clients are seen. -}
isImportantNetMessage :: NetMessage -> Maybe ClientID
isImportantNetMessage (Pushing c (CanPush _ _)) = Just c
isImportantNetMessage (Pushing c (PushRequest _)) = Just c
isImportantNetMessage _ = Nothing

readdressNetMessage :: NetMessage -> ClientID -> NetMessage
readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid
readdressNetMessage (Pushing _ stage) c = Pushing c stage
readdressNetMessage m _ = m

{- Convert a NetMessage to something that can be logged. -}
logNetMessage :: NetMessage -> String
logNetMessage (Pushing c stage) = show $ Pushing (logClientID c) $
	case stage of
		ReceivePackOutput n _ -> ReceivePackOutput n elided
		SendPackOutput n _ -> SendPackOutput n elided
		s -> s
  where
  	elided = B8.pack "<elided>"
logNetMessage (PairingNotification stage c uuid) =
	show $ PairingNotification stage (logClientID c) uuid
logNetMessage m = show m

logClientID :: ClientID -> ClientID
logClientID c = T.concat [T.take 1 c, T.pack $ show $ T.length c]

{- Things that initiate either side of a push, but do not actually send data. -}
isPushInitiation :: PushStage -> Bool
isPushInitiation (PushRequest _) = True
isPushInitiation (StartingPush _) = True
isPushInitiation _ = False

isPushNotice :: PushStage -> Bool
isPushNotice (CanPush _ _) = True
isPushNotice _ = False

data PushSide = SendPack | ReceivePack
	deriving (Eq, Ord, Show)

pushDestinationSide :: PushStage -> PushSide
pushDestinationSide (CanPush _ _) = ReceivePack
pushDestinationSide (PushRequest _) = SendPack
pushDestinationSide (StartingPush _) = ReceivePack
pushDestinationSide (ReceivePackOutput _ _) = SendPack
pushDestinationSide (SendPackOutput _ _) = ReceivePack
pushDestinationSide (ReceivePackDone _) = SendPack

type SideMap a = PushSide -> a

mkSideMap :: STM a -> IO (SideMap a)
mkSideMap gen = do
	(sp, rp) <- atomically $ (,) <$> gen <*> gen
	return $ lookupside sp rp
  where
	lookupside sp _ SendPack = sp
	lookupside _ rp ReceivePack = rp

getSide :: PushSide -> SideMap a -> a
getSide side m = m side

type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage))

data NetMessager = NetMessager
	-- outgoing messages
	{ netMessages :: TChan NetMessage
	-- important messages for each client
	, importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
	-- important messages that are believed to have been sent to a client
	, sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
	-- write to this to restart the net messager
	, netMessagerRestart :: MSampleVar ()
	-- only one side of a push can be running at a time
	-- the TMVars are empty when nothing is running
	, netMessagerPushRunning :: SideMap (TMVar ClientID)
	-- number of threads trying to push to the same client 
	-- at the same time (either running, or waiting to run)
	, netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int))
	-- incoming messages containing data for a push,
	-- on a per-client and per-side basis
	, netMessagesInboxes :: SideMap Inboxes
	}

newNetMessager :: IO NetMessager
newNetMessager = NetMessager
	<$> atomically newTChan
	<*> atomically (newTMVar M.empty)
	<*> atomically (newTMVar M.empty)
	<*> newEmptySV
	<*> mkSideMap newEmptyTMVar
	<*> mkSideMap (newTVar M.empty)
	<*> mkSideMap (newTVar M.empty)