aboutsummaryrefslogtreecommitdiff
path: root/Remote
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2016-12-06 15:08:00 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2016-12-06 15:09:04 -0400
commit03a65c127403e731d7866ee3bbe397fcae7c7761 (patch)
tree0e802d73aa0f31bdb9573a92d644558c2aefce89 /Remote
parent05c5822a7fababe816da579ac50d436fe19a6499 (diff)
finish implementation of Remote.P2P (untested)
Not tested at all, but it just might work. Only known problem is that progress is not updated when storing to a P2P remote. This commit was sponsored by Nick Daly on Patreon.
Diffstat (limited to 'Remote')
-rw-r--r--Remote/P2P.hs140
1 files changed, 116 insertions, 24 deletions
diff --git a/Remote/P2P.hs b/Remote/P2P.hs
index e0428eeeb..f97d76e71 100644
--- a/Remote/P2P.hs
+++ b/Remote/P2P.hs
@@ -11,14 +11,23 @@ module Remote.P2P (
) where
import Annex.Common
+import qualified Annex
+import qualified P2P.Protocol as P2P
import P2P.Address
+import P2P.Annex
import Types.Remote
import Types.GitConfig
import qualified Git
import Config
import Config.Cost
import Remote.Helper.Git
-import Remote.Helper.Special
+import Remote.Helper.Tor
+import Utility.Tor
+import Utility.Metered
+import Types.NumCopies
+
+import Control.Concurrent
+import Control.Concurrent.STM
remote :: RemoteType
remote = RemoteType {
@@ -32,18 +41,18 @@ remote = RemoteType {
chainGen :: P2PAddress -> Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
chainGen addr r u c gc = do
- workerpool <- mkWorkerPool addr
+ connpool <- mkConnectionPool
cst <- remoteCost gc expensiveRemoteCost
let this = Remote
{ uuid = u
, cost = cst
, name = Git.repoDescribe r
- , storeKey = storeKeyDummy
- , retrieveKeyFile = retreiveKeyFileDummy
+ , storeKey = store addr connpool
+ , retrieveKeyFile = retrieve addr connpool
, retrieveKeyFileCheap = \_ _ _ -> return False
- , removeKey = removeKeyDummy
- , lockContent = Nothing -- TODO use p2p protocol locking
- , checkPresent = checkPresentDummy
+ , removeKey = remove addr connpool
+ , lockContent = Just (lock u addr connpool)
+ , checkPresent = checkpresent addr connpool
, checkPresentCheap = False
, whereisKey = Nothing
, remoteFsck = Nothing
@@ -60,26 +69,109 @@ chainGen addr r u c gc = do
, claimUrl = Nothing
, checkUrl = Nothing
}
- return $ Just $ specialRemote' (specialRemoteCfg c) c
- (simplyPrepare $ store this workerpool)
- (simplyPrepare $ retrieve this workerpool)
- (simplyPrepare $ remove this workerpool)
- (simplyPrepare $ checkKey this workerpool)
- this
+ return (Just this)
+
+-- TODO update progress
+store :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
+store addr connpool k af p = fromMaybe False
+ <$> runProto addr connpool (P2P.put k af)
+
+retrieve :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
+retrieve addr connpool k af dest _p = unVerified $ fromMaybe False
+ <$> runProto addr connpool (P2P.get dest k af)
+
+remove :: P2PAddress -> ConnectionPool -> Key -> Annex Bool
+remove addr connpool k = fromMaybe False
+ <$> runProto addr connpool (P2P.remove k)
+
+checkpresent :: P2PAddress -> ConnectionPool -> Key -> Annex Bool
+checkpresent addr connpool k = maybe unavail return
+ =<< runProto addr connpool (P2P.checkPresent k)
+ where
+ unavail = giveup "can't connect to peer"
+
+lock :: UUID -> P2PAddress -> ConnectionPool -> Key -> (VerifiedCopy -> Annex r) -> Annex r
+lock theiruuid addr connpool k callback =
+ withConnection addr connpool $ \conn -> do
+ connv <- liftIO $ newMVar conn
+ let runproto d p = do
+ c <- liftIO $ takeMVar connv
+ (c', mr) <- runProto' p c
+ liftIO $ putMVar connv c'
+ return (fromMaybe d mr)
+ r <- P2P.lockContentWhile runproto k go
+ conn' <- liftIO $ takeMVar connv
+ return (conn', r)
+ where
+ go False = giveup "can't lock content"
+ go True = withVerifiedCopy LockedCopy theiruuid (return True) callback
+
+-- | A connection to the peer.
+data Connection
+ = TorAnnexConnection RunEnv
+ | ClosedConnection
+
+type ConnectionPool = TVar [Connection]
-data WorkerPool = WorkerPool
+mkConnectionPool :: Annex ConnectionPool
+mkConnectionPool = liftIO $ newTVarIO []
-mkWorkerPool :: P2PAddress -> Annex WorkerPool
-mkWorkerPool addr = undefined
+-- Runs the Proto action.
+runProto :: P2PAddress -> ConnectionPool -> P2P.Proto a -> Annex (Maybe a)
+runProto addr connpool a = withConnection addr connpool (runProto' a)
-store :: Remote -> WorkerPool -> Storer
-store r workerpool = undefined
+runProto' :: P2P.Proto a -> Connection -> Annex (Connection, Maybe a)
+runProto' _ ClosedConnection = return (ClosedConnection, Nothing)
+runProto' a conn@(TorAnnexConnection runenv) = do
+ r <- runFullProto Client runenv a
+ -- When runFullProto fails, the connection is no longer usable,
+ -- so close it.
+ if isJust r
+ then return (conn, r)
+ else do
+ liftIO $ hClose (runIhdl runenv)
+ return (ClosedConnection, r)
-retrieve :: Remote -> WorkerPool -> Retriever
-retrieve r workerpool = undefined
+-- Uses an open connection if one is available in the ConnectionPool;
+-- otherwise opens a new connection.
+--
+-- Once the action is done, the connection is added back to the
+-- ConnectionPool, unless it's no longer open.
+withConnection :: P2PAddress -> ConnectionPool -> (Connection -> Annex (Connection, a)) -> Annex a
+withConnection addr connpool a = bracketOnError get cache go
+ where
+ get = do
+ mc <- liftIO $ atomically $ do
+ l <- readTVar connpool
+ case l of
+ [] -> do
+ writeTVar connpool []
+ return Nothing
+ (c:cs) -> do
+ writeTVar connpool cs
+ return (Just c)
+ maybe (openConnection addr) return mc
+
+ cache ClosedConnection = return ()
+ cache conn = liftIO $ atomically $ modifyTVar' connpool (conn:)
-remove :: Remote -> WorkerPool -> Remover
-remove r workerpool k = undefined
+ go conn = do
+ (conn', r) <- a conn
+ cache conn'
+ return r
-checkKey :: Remote -> WorkerPool -> CheckPresent
-checkKey r workerpool k = undefined
+openConnection :: P2PAddress -> Annex Connection
+openConnection (TorAnnex onionaddress onionport) = do
+ v <- liftIO $ tryNonAsync $
+ torHandle =<< connectHiddenService onionaddress onionport
+ case v of
+ Right h -> do
+ g <- Annex.gitRepo
+ let runenv = RunEnv
+ { runRepo = g
+ , runCheckAuth = const False
+ , runIhdl = h
+ , runOhdl = h
+ }
+ return (TorAnnexConnection runenv)
+ Left _e -> return ClosedConnection