aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2016-12-06 15:08:00 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2016-12-06 15:09:04 -0400
commit03a65c127403e731d7866ee3bbe397fcae7c7761 (patch)
tree0e802d73aa0f31bdb9573a92d644558c2aefce89
parent05c5822a7fababe816da579ac50d436fe19a6499 (diff)
finish implementation of Remote.P2P (untested)
Not tested at all, but it just might work. Only known problem is that progress is not updated when storing to a P2P remote. This commit was sponsored by Nick Daly on Patreon.
-rw-r--r--Remote/P2P.hs140
1 files changed, 116 insertions, 24 deletions
diff --git a/Remote/P2P.hs b/Remote/P2P.hs
index e0428eeeb..f97d76e71 100644
--- a/Remote/P2P.hs
+++ b/Remote/P2P.hs
@@ -11,14 +11,23 @@ module Remote.P2P (
) where
import Annex.Common
+import qualified Annex
+import qualified P2P.Protocol as P2P
import P2P.Address
+import P2P.Annex
import Types.Remote
import Types.GitConfig
import qualified Git
import Config
import Config.Cost
import Remote.Helper.Git
-import Remote.Helper.Special
+import Remote.Helper.Tor
+import Utility.Tor
+import Utility.Metered
+import Types.NumCopies
+
+import Control.Concurrent
+import Control.Concurrent.STM
remote :: RemoteType
remote = RemoteType {
@@ -32,18 +41,18 @@ remote = RemoteType {
chainGen :: P2PAddress -> Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
chainGen addr r u c gc = do
- workerpool <- mkWorkerPool addr
+ connpool <- mkConnectionPool
cst <- remoteCost gc expensiveRemoteCost
let this = Remote
{ uuid = u
, cost = cst
, name = Git.repoDescribe r
- , storeKey = storeKeyDummy
- , retrieveKeyFile = retreiveKeyFileDummy
+ , storeKey = store addr connpool
+ , retrieveKeyFile = retrieve addr connpool
, retrieveKeyFileCheap = \_ _ _ -> return False
- , removeKey = removeKeyDummy
- , lockContent = Nothing -- TODO use p2p protocol locking
- , checkPresent = checkPresentDummy
+ , removeKey = remove addr connpool
+ , lockContent = Just (lock u addr connpool)
+ , checkPresent = checkpresent addr connpool
, checkPresentCheap = False
, whereisKey = Nothing
, remoteFsck = Nothing
@@ -60,26 +69,109 @@ chainGen addr r u c gc = do
, claimUrl = Nothing
, checkUrl = Nothing
}
- return $ Just $ specialRemote' (specialRemoteCfg c) c
- (simplyPrepare $ store this workerpool)
- (simplyPrepare $ retrieve this workerpool)
- (simplyPrepare $ remove this workerpool)
- (simplyPrepare $ checkKey this workerpool)
- this
+ return (Just this)
+
+-- TODO update progress
+store :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
+store addr connpool k af p = fromMaybe False
+ <$> runProto addr connpool (P2P.put k af)
+
+retrieve :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
+retrieve addr connpool k af dest _p = unVerified $ fromMaybe False
+ <$> runProto addr connpool (P2P.get dest k af)
+
+remove :: P2PAddress -> ConnectionPool -> Key -> Annex Bool
+remove addr connpool k = fromMaybe False
+ <$> runProto addr connpool (P2P.remove k)
+
+checkpresent :: P2PAddress -> ConnectionPool -> Key -> Annex Bool
+checkpresent addr connpool k = maybe unavail return
+ =<< runProto addr connpool (P2P.checkPresent k)
+ where
+ unavail = giveup "can't connect to peer"
+
+lock :: UUID -> P2PAddress -> ConnectionPool -> Key -> (VerifiedCopy -> Annex r) -> Annex r
+lock theiruuid addr connpool k callback =
+ withConnection addr connpool $ \conn -> do
+ connv <- liftIO $ newMVar conn
+ let runproto d p = do
+ c <- liftIO $ takeMVar connv
+ (c', mr) <- runProto' p c
+ liftIO $ putMVar connv c'
+ return (fromMaybe d mr)
+ r <- P2P.lockContentWhile runproto k go
+ conn' <- liftIO $ takeMVar connv
+ return (conn', r)
+ where
+ go False = giveup "can't lock content"
+ go True = withVerifiedCopy LockedCopy theiruuid (return True) callback
+
+-- | A connection to the peer.
+data Connection
+ = TorAnnexConnection RunEnv
+ | ClosedConnection
+
+type ConnectionPool = TVar [Connection]
-data WorkerPool = WorkerPool
+mkConnectionPool :: Annex ConnectionPool
+mkConnectionPool = liftIO $ newTVarIO []
-mkWorkerPool :: P2PAddress -> Annex WorkerPool
-mkWorkerPool addr = undefined
+-- Runs the Proto action.
+runProto :: P2PAddress -> ConnectionPool -> P2P.Proto a -> Annex (Maybe a)
+runProto addr connpool a = withConnection addr connpool (runProto' a)
-store :: Remote -> WorkerPool -> Storer
-store r workerpool = undefined
+runProto' :: P2P.Proto a -> Connection -> Annex (Connection, Maybe a)
+runProto' _ ClosedConnection = return (ClosedConnection, Nothing)
+runProto' a conn@(TorAnnexConnection runenv) = do
+ r <- runFullProto Client runenv a
+ -- When runFullProto fails, the connection is no longer usable,
+ -- so close it.
+ if isJust r
+ then return (conn, r)
+ else do
+ liftIO $ hClose (runIhdl runenv)
+ return (ClosedConnection, r)
-retrieve :: Remote -> WorkerPool -> Retriever
-retrieve r workerpool = undefined
+-- Uses an open connection if one is available in the ConnectionPool;
+-- otherwise opens a new connection.
+--
+-- Once the action is done, the connection is added back to the
+-- ConnectionPool, unless it's no longer open.
+withConnection :: P2PAddress -> ConnectionPool -> (Connection -> Annex (Connection, a)) -> Annex a
+withConnection addr connpool a = bracketOnError get cache go
+ where
+ get = do
+ mc <- liftIO $ atomically $ do
+ l <- readTVar connpool
+ case l of
+ [] -> do
+ writeTVar connpool []
+ return Nothing
+ (c:cs) -> do
+ writeTVar connpool cs
+ return (Just c)
+ maybe (openConnection addr) return mc
+
+ cache ClosedConnection = return ()
+ cache conn = liftIO $ atomically $ modifyTVar' connpool (conn:)
-remove :: Remote -> WorkerPool -> Remover
-remove r workerpool k = undefined
+ go conn = do
+ (conn', r) <- a conn
+ cache conn'
+ return r
-checkKey :: Remote -> WorkerPool -> CheckPresent
-checkKey r workerpool k = undefined
+openConnection :: P2PAddress -> Annex Connection
+openConnection (TorAnnex onionaddress onionport) = do
+ v <- liftIO $ tryNonAsync $
+ torHandle =<< connectHiddenService onionaddress onionport
+ case v of
+ Right h -> do
+ g <- Annex.gitRepo
+ let runenv = RunEnv
+ { runRepo = g
+ , runCheckAuth = const False
+ , runIhdl = h
+ , runOhdl = h
+ }
+ return (TorAnnexConnection runenv)
+ Left _e -> return ClosedConnection