module Marconi.Bootstrap where
import Control.Concurrent.STM (atomically)
import Control.Exception (catch)
import Control.Lens ((^.))
import Data.List.NonEmpty (fromList, nub)
import Data.Text (pack)
import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>))
import Prettyprinter.Render.Text (renderStrict)
import Cardano.Api (AsType (AsShelleyAddress), ChainPoint (ChainPointAtGenesis), deserialiseFromBech32)
import Cardano.BM.Setup (withTrace)
import Cardano.BM.Trace (logError)
import Cardano.BM.Tracing (defaultConfigStdout)
import Cardano.Streaming (ChainSyncEventException (NoIntersectionFound), withChainSyncEventStream)
import Marconi.Api.HttpServer qualified as Http
import Marconi.Api.Types (CliArgs (CliArgs), HasDBQueryEnv (queryTMVar), HasJsonRpcEnv (queryEnv),
JsonRpcEnv (JsonRpcEnv, _httpSettings, _queryEnv), RpcPortNumber, TargetAddresses,
UtxoQueryTMVar (UtxoQueryTMVar))
import Marconi.Api.UtxoIndexersQuery qualified as QApi
import Marconi.Indexers (mkIndexerStream, startIndexers, utxoWorker)
import Network.Wai.Handler.Warp (defaultSettings, setPort)
bootstrapJsonRpc
:: Maybe RpcPortNumber
-> TargetAddresses
-> IO JsonRpcEnv
bootstrapJsonRpc :: Maybe RpcPortNumber -> TargetAddresses -> IO JsonRpcEnv
bootstrapJsonRpc Maybe RpcPortNumber
maybePort TargetAddresses
targetAddresses = do
DBQueryEnv
queryenv <- TargetAddresses -> IO DBQueryEnv
QApi.bootstrap TargetAddresses
targetAddresses
let httpsettings :: Settings
httpsettings = Settings
-> (RpcPortNumber -> Settings) -> Maybe RpcPortNumber -> Settings
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Settings
defaultSettings ((RpcPortNumber -> Settings -> Settings)
-> Settings -> RpcPortNumber -> Settings
forall a b c. (a -> b -> c) -> b -> a -> c
flip RpcPortNumber -> Settings -> Settings
setPort Settings
defaultSettings ) Maybe RpcPortNumber
maybePort
JsonRpcEnv -> IO JsonRpcEnv
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JsonRpcEnv -> IO JsonRpcEnv) -> JsonRpcEnv -> IO JsonRpcEnv
forall a b. (a -> b) -> a -> b
$ JsonRpcEnv :: Settings -> DBQueryEnv -> JsonRpcEnv
JsonRpcEnv
{ _httpSettings :: Settings
_httpSettings = Settings
httpsettings
, _queryEnv :: DBQueryEnv
_queryEnv = DBQueryEnv
queryenv
}
bootstrapHttp
:: JsonRpcEnv
-> IO ()
bootstrapHttp :: JsonRpcEnv -> IO ()
bootstrapHttp = JsonRpcEnv -> IO ()
Http.bootstrap
bootstrapUtxoIndexers
:: CliArgs
-> JsonRpcEnv
-> IO ()
bootstrapUtxoIndexers :: CliArgs -> JsonRpcEnv -> IO ()
bootstrapUtxoIndexers (CliArgs FilePath
socket FilePath
dbPath Maybe RpcPortNumber
_ NetworkId
networkId TargetAddresses
targetAddresses) JsonRpcEnv
env = do
let (UtxoQueryTMVar TMVar UtxoIndex
qTMVar) = JsonRpcEnv
env JsonRpcEnv
-> Getting UtxoQueryTMVar JsonRpcEnv UtxoQueryTMVar
-> UtxoQueryTMVar
forall s a. s -> Getting a s a -> a
^. (DBQueryEnv -> Const UtxoQueryTMVar DBQueryEnv)
-> JsonRpcEnv -> Const UtxoQueryTMVar JsonRpcEnv
forall c. HasJsonRpcEnv c => Lens' c DBQueryEnv
queryEnv ((DBQueryEnv -> Const UtxoQueryTMVar DBQueryEnv)
-> JsonRpcEnv -> Const UtxoQueryTMVar JsonRpcEnv)
-> ((UtxoQueryTMVar -> Const UtxoQueryTMVar UtxoQueryTMVar)
-> DBQueryEnv -> Const UtxoQueryTMVar DBQueryEnv)
-> Getting UtxoQueryTMVar JsonRpcEnv UtxoQueryTMVar
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (UtxoQueryTMVar -> Const UtxoQueryTMVar UtxoQueryTMVar)
-> DBQueryEnv -> Const UtxoQueryTMVar DBQueryEnv
forall c. HasDBQueryEnv c => Lens' c UtxoQueryTMVar
queryTMVar
callbackIndexer :: QApi.UtxoIndex -> IO QApi.UtxoIndex
callbackIndexer :: UtxoIndex -> IO UtxoIndex
callbackIndexer UtxoIndex
index = STM UtxoIndex -> IO UtxoIndex
forall a. STM a -> IO a
atomically (STM UtxoIndex -> IO UtxoIndex) -> STM UtxoIndex -> IO UtxoIndex
forall a b. (a -> b) -> a -> b
$ TMVar UtxoIndex -> UtxoIndex -> STM ()
forall a. TMVar a -> a -> STM ()
QApi.writeTMVar TMVar UtxoIndex
qTMVar UtxoIndex
index STM () -> STM UtxoIndex -> STM UtxoIndex
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> UtxoIndex -> STM UtxoIndex
forall (f :: * -> *) a. Applicative f => a -> f a
pure UtxoIndex
index
([ChainPoint]
_, Coordinator
coordinator) <-
[(Worker, FilePath)] -> IO ([ChainPoint], Coordinator)
startIndexers [( (UtxoIndex -> IO UtxoIndex) -> Maybe TargetAddresses -> Worker
utxoWorker UtxoIndex -> IO UtxoIndex
callbackIndexer (TargetAddresses -> Maybe TargetAddresses
forall a. a -> Maybe a
Just TargetAddresses
targetAddresses)
, FilePath
dbPath )]
let indexers :: Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
indexers = Coordinator
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
forall r.
Coordinator
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
mkIndexerStream Coordinator
coordinator
chainPoint :: ChainPoint
chainPoint = ChainPoint
ChainPointAtGenesis
Configuration
c <- IO Configuration
defaultConfigStdout
Configuration -> Text -> (Trace IO Text -> IO ()) -> IO ()
forall (m :: * -> *) a t.
(MonadIO m, MonadMask m, ToJSON a, FromJSON a, ToObject a) =>
Configuration -> Text -> (Trace m a -> m t) -> m t
withTrace Configuration
c Text
"marconi-mamba" ((Trace IO Text -> IO ()) -> IO ())
-> (Trace IO Text -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Trace IO Text
trace ->
FilePath
-> NetworkId
-> [ChainPoint]
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO Any
-> IO ())
-> IO ()
forall r b.
FilePath
-> NetworkId
-> [ChainPoint]
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO b)
-> IO b
withChainSyncEventStream FilePath
socket NetworkId
networkId [ChainPoint
chainPoint] Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO Any
-> IO ()
forall r.
Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
indexers
IO () -> (ChainSyncEventException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \ChainSyncEventException
NoIntersectionFound ->
Trace IO Text -> Text -> IO ()
forall (m :: * -> *) a. MonadIO m => Trace m a -> a -> m ()
logError Trace IO Text
trace (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
SimpleDocStream Any -> Text
forall ann. SimpleDocStream ann -> Text
renderStrict (SimpleDocStream Any -> Text) -> SimpleDocStream Any -> Text
forall a b. (a -> b) -> a -> b
$
LayoutOptions -> Doc Any -> SimpleDocStream Any
forall ann. LayoutOptions -> Doc ann -> SimpleDocStream ann
layoutPretty LayoutOptions
defaultLayoutOptions (Doc Any -> SimpleDocStream Any) -> Doc Any -> SimpleDocStream Any
forall a b. (a -> b) -> a -> b
$
Doc Any
"No intersection found when looking for the chain point"
Doc Any -> Doc Any -> Doc Any
forall ann. Doc ann -> Doc ann -> Doc ann
<+> (FilePath -> Doc Any
forall a ann. Pretty a => a -> Doc ann
pretty (FilePath -> Doc Any)
-> (ChainPoint -> FilePath) -> ChainPoint -> Doc Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainPoint -> FilePath
forall a. Show a => a -> FilePath
show (ChainPoint -> Doc Any) -> ChainPoint -> Doc Any
forall a b. (a -> b) -> a -> b
$ ChainPoint
chainPoint) Doc Any -> Doc Any -> Doc Any
forall a. Semigroup a => a -> a -> a
<> Doc Any
"."
Doc Any -> Doc Any -> Doc Any
forall ann. Doc ann -> Doc ann -> Doc ann
<+> Doc Any
"Please check the slot number and the block hash do belong to the chain"
targetAddressParser
:: String
-> TargetAddresses
targetAddressParser :: FilePath -> TargetAddresses
targetAddressParser =
TargetAddresses -> TargetAddresses
forall a. Eq a => NonEmpty a -> NonEmpty a
nub
(TargetAddresses -> TargetAddresses)
-> (FilePath -> TargetAddresses) -> FilePath -> TargetAddresses
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Address ShelleyAddr] -> TargetAddresses
forall a. [a] -> NonEmpty a
fromList
([Address ShelleyAddr] -> TargetAddresses)
-> (FilePath -> [Address ShelleyAddr])
-> FilePath
-> TargetAddresses
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either Bech32DecodeError [Address ShelleyAddr]
-> [Address ShelleyAddr]
forall e a. Show e => Either e a -> a
fromJustWithError
(Either Bech32DecodeError [Address ShelleyAddr]
-> [Address ShelleyAddr])
-> (FilePath -> Either Bech32DecodeError [Address ShelleyAddr])
-> FilePath
-> [Address ShelleyAddr]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath -> Either Bech32DecodeError (Address ShelleyAddr))
-> [FilePath] -> Either Bech32DecodeError [Address ShelleyAddr]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (Text -> Either Bech32DecodeError (Address ShelleyAddr)
deserializeToCardano (Text -> Either Bech32DecodeError (Address ShelleyAddr))
-> (FilePath -> Text)
-> FilePath
-> Either Bech32DecodeError (Address ShelleyAddr)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> Text
pack)
([FilePath] -> Either Bech32DecodeError [Address ShelleyAddr])
-> (FilePath -> [FilePath])
-> FilePath
-> Either Bech32DecodeError [Address ShelleyAddr]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> [FilePath]
words
where
deserializeToCardano :: Text -> Either Bech32DecodeError (Address ShelleyAddr)
deserializeToCardano = AsType (Address ShelleyAddr)
-> Text -> Either Bech32DecodeError (Address ShelleyAddr)
forall a.
SerialiseAsBech32 a =>
AsType a -> Text -> Either Bech32DecodeError a
deserialiseFromBech32 AsType (Address ShelleyAddr)
AsShelleyAddress
fromJustWithError :: (Show e) => Either e a -> a
fromJustWithError :: Either e a -> a
fromJustWithError Either e a
v = case Either e a
v of
Left e
e ->
FilePath -> a
forall a. HasCallStack => FilePath -> a
error (FilePath -> a) -> FilePath -> a
forall a b. (a -> b) -> a -> b
$ FilePath
"\n!!!\n Abnormal Termination with Error: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> e -> FilePath
forall a. Show a => a -> FilePath
show e
e FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
"\n!!!\n"
Right a
accounts -> a
accounts