module RewindableIndex.Storable
  ( -- * State
    Config
  , memoryBufferSize
  , State
  , handle
  , config
  , emptyState
  , Storage
  , storage
  , events
  , cursor
  , getMemoryEvents
  , getEvents
  , filterWithQueryInterval
  , StorableEvent
  , StorablePoint
  , StorableQuery
  , StorableResult
  , StorableMonad
    -- * API
  , QueryInterval(..)
  , Buffered(..)
  , Queryable(..)
  , Resumable(..)
  , Rewindable(..)
  , HasPoint(..)
  , insert
  , insertMany
  , rewind
  , resume
  , query
  ) where

import Control.Applicative ((<|>))
import Control.Lens.Operators ((%~), (.~), (^.))
import Control.Lens.TH qualified as Lens
import Control.Monad.Primitive (PrimMonad, PrimState)
import Data.Foldable (foldlM)
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Vector qualified as V
import Data.Vector.Generic qualified as VG
import Data.Vector.Mutable qualified as VM
import GHC.Generics (Generic)

{-
   The extensible parts of the indexers are the way data is stored into some form
   of persistent storage. The interface used for working blockchain events is defined
   in this module.

   There are quite a bit of type variables that are involed in defining this interface.
   Some of them stand for the type of database connection (h), the type of events (e),
   the type of points along the blockchain (denoted by p), the type of queries (q) and
   results.

   The following data families implement the observation that most of the variables
   can be derived from the way information is stored in the database, so there is
   quite a bit of convenience in reducing the number of type variables to just two.
   One that stands for the database connection (which should be a newtype for each
   indexer) and one for the monad in which the indexer runs (usually IO).
-}
data family StorableEvent h

type family StorablePoint h

data family StorableQuery h

data family StorableResult h

type family StorableMonad h :: * -> *

{-
   Query intervals are a necessary tool to make the queries a little safer. As we can
   assume that there will be multiple concurrent indexers running there is no guarantee
   that they are all synchronised upto the same block, so specifying a query validity
   will ensure that the queries data is acceptably synchronised across all queried
   indexers.
-}
data QueryInterval p =
    QEverything
  | QInterval p p
  deriving (Int -> QueryInterval p -> ShowS
[QueryInterval p] -> ShowS
QueryInterval p -> String
(Int -> QueryInterval p -> ShowS)
-> (QueryInterval p -> String)
-> ([QueryInterval p] -> ShowS)
-> Show (QueryInterval p)
forall p. Show p => Int -> QueryInterval p -> ShowS
forall p. Show p => [QueryInterval p] -> ShowS
forall p. Show p => QueryInterval p -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [QueryInterval p] -> ShowS
$cshowList :: forall p. Show p => [QueryInterval p] -> ShowS
show :: QueryInterval p -> String
$cshow :: forall p. Show p => QueryInterval p -> String
showsPrec :: Int -> QueryInterval p -> ShowS
$cshowsPrec :: forall p. Show p => Int -> QueryInterval p -> ShowS
Show, QueryInterval p -> QueryInterval p -> Bool
(QueryInterval p -> QueryInterval p -> Bool)
-> (QueryInterval p -> QueryInterval p -> Bool)
-> Eq (QueryInterval p)
forall p. Eq p => QueryInterval p -> QueryInterval p -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QueryInterval p -> QueryInterval p -> Bool
$c/= :: forall p. Eq p => QueryInterval p -> QueryInterval p -> Bool
== :: QueryInterval p -> QueryInterval p -> Bool
$c== :: forall p. Eq p => QueryInterval p -> QueryInterval p -> Bool
Eq, (forall x. QueryInterval p -> Rep (QueryInterval p) x)
-> (forall x. Rep (QueryInterval p) x -> QueryInterval p)
-> Generic (QueryInterval p)
forall x. Rep (QueryInterval p) x -> QueryInterval p
forall x. QueryInterval p -> Rep (QueryInterval p) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall p x. Rep (QueryInterval p) x -> QueryInterval p
forall p x. QueryInterval p -> Rep (QueryInterval p) x
$cto :: forall p x. Rep (QueryInterval p) x -> QueryInterval p
$cfrom :: forall p x. QueryInterval p -> Rep (QueryInterval p) x
Generic)

{-
   The first, `Buffered` class explains what it means for an indexer to be accumulating
   a set of results that will be flushed to disk when the memory buffer is fully filled.

   The memory layout of the indexer has been simplified by a lot since the previous
   version and it looks something like this:

   |e|e|e|e|e|e|e|e|e|e|e|e|e|e|e|e|e|e|                                     |
   |-------------------|---------------|-------------------------------------|
   |   memory/buffer   |  disk/events  |       disk/aggregate (optional)     |

   When the buffer is filled with events, they all get flushed to disk. We need this
   operation for performance reasons. Most databases will have significantly improved
   performance for batch inserts.

   Rollbacks will only happen in the memory or disk/events storage area, so the
   developer has to make sure that the allocated space for the buffer and the
   disk/events is greater than the K parameter.

   The last part disk/aggregate represents some aggregated data. At some point the
   developer should write code to aggregate the disk/events into the aggregate
   section of the database. We did not provide any special handling of this for the
   following reasons:

     1) A lot of indexers will not require aggregated data. Only storing events should
        be enough for most applications.
     2) Data aggregation can be implemented at database level and probably be more
        efficient than what Haskell can do here.
     3) We can always extend the interface to include direct support for this pattern
        if there is a demand from the users of the API.
-}
class Buffered h where
  -- This function persists the memory/buffer events to disk.
  persistToStorage :: Foldable f => f (StorableEvent h) -> h -> StorableMonad h h

  {- This function retrieves the events from the disk/events area.
     If the user chooses to only store events, without accumulating them, this function
     is expected to return the events over which rollbacks can occur in order to keep
     things performant -}
  getStoredEvents :: h -> StorableMonad h [StorableEvent h]

{-
   All information from indexers should be made accessible through queries. Queries
   act a lot like folds over the event stream. Each query introduces two indexed types:
   one for the type of requests (called StorableQuery) and one for responses (called
   StorableResult).

   All queries include a validity interval. If the data is not available for a specified
   interval, the returned result should specify that. It is also recommended that the result
   includes the slot number at which the query was ran.
-}
class Queryable h where
  queryStorage
    :: Foldable f
    => QueryInterval (StorablePoint h)
    -> f (StorableEvent h)
    -> h
    -> StorableQuery h
    -> StorableMonad h (StorableResult h)

{-
   One of the reasons why indexers were born was to solve one of the issues very specific
   to blockchains: rollbacks. The basic idea was simple, keep a history of the blockchain
   data and whenever we have a rollback restore the version which is the target of the
   rollback.

   This is what the next class is meant to do.
-}
class Rewindable h where
  rewindStorage :: StorablePoint h -> h -> StorableMonad h (Maybe h)

{-
   Another feature of indexers is the ability to resume from a previously stored event.
   One way of implementing this is to make sure that there is always at least one event
   present on disk, event which includes the slot number when it was generated.

   This function should return the most recent resume point. The plan is to support
   multiple resume points, but for now there is no way to ensure that no data duplication
   happens (the same events get reinserted).

   This will not be a problem if the indexer learns how to remove the `old` slots
   so there will be no duplication. This may be implemented later if the API users
   request it.
-}
class Resumable h where
  resumeFromStorage :: h -> StorableMonad h [StorablePoint h]

{-
   The next class is witnessing the fact that events contain enough information to
   retrieve the point when they were produced.
-}
class HasPoint e p where
  getPoint :: e -> p

{-
   The configuration includes hints about the amount of events stored in memory and
   on disk. This information is used by the storage engine to decide how much memory
   to allocated and when to flush the memory buffer or roll the disk events into
   an aggregate database structure.
-}
newtype Config = Config
  { Config -> Int
_memoryBufferSize :: Int
  } deriving (Int -> Config -> ShowS
[Config] -> ShowS
Config -> String
(Int -> Config -> ShowS)
-> (Config -> String) -> ([Config] -> ShowS) -> Show Config
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Config] -> ShowS
$cshowList :: [Config] -> ShowS
show :: Config -> String
$cshow :: Config -> String
showsPrec :: Int -> Config -> ShowS
$cshowsPrec :: Int -> Config -> ShowS
Show, Config -> Config -> Bool
(Config -> Config -> Bool)
-> (Config -> Config -> Bool) -> Eq Config
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Config -> Config -> Bool
$c/= :: Config -> Config -> Bool
== :: Config -> Config -> Bool
$c== :: Config -> Config -> Bool
Eq)
$(Lens.makeLenses ''Config)

data Storage h = Storage
  { Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
_events :: VM.MVector (PrimState (StorableMonad h)) (StorableEvent h)
  , Storage h -> Int
_cursor :: Int
  }
$(Lens.makeLenses ''Storage)

data State h = State
  { State h -> Config
_config  :: Config
  , State h -> Storage h
_storage :: Storage h
  , State h -> h
_handle  :: h
  }
$(Lens.makeLenses ''State)

emptyState
  :: PrimMonad (StorableMonad h)
  => Int
  -> h
  -> StorableMonad h (State h)
emptyState :: Int -> h -> StorableMonad h (State h)
emptyState Int
memBuf h
hdl = do
  MVector (PrimState (StorableMonad h)) (StorableEvent h)
v <- Int
-> StorableMonad
     h (MVector (PrimState (StorableMonad h)) (StorableEvent h))
forall (m :: * -> *) a.
PrimMonad m =>
Int -> m (MVector (PrimState m) a)
VM.new Int
memBuf
  State h -> StorableMonad h (State h)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (State h -> StorableMonad h (State h))
-> State h -> StorableMonad h (State h)
forall a b. (a -> b) -> a -> b
$ State :: forall h. Config -> Storage h -> h -> State h
State { _config :: Config
_config = Config :: Int -> Config
Config { _memoryBufferSize :: Int
_memoryBufferSize = Int
memBuf
                                  }
               , _storage :: Storage h
_storage = Storage :: forall h.
MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> Int -> Storage h
Storage { _events :: MVector (PrimState (StorableMonad h)) (StorableEvent h)
_events = MVector (PrimState (StorableMonad h)) (StorableEvent h)
v
                                    , _cursor :: Int
_cursor = Int
0
                                    }
               , _handle :: h
_handle = h
hdl
               }

-- Get events from the memory buffer.
getMemoryEvents
  :: Storage h
  -> V.MVector (PrimState (StorableMonad h)) (StorableEvent h)
getMemoryEvents :: Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
getMemoryEvents Storage h
s = Int
-> Int
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall s a. Int -> Int -> MVector s a -> MVector s a
VM.slice Int
0 (Storage h
s Storage h -> Getting Int (Storage h) Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int (Storage h) Int
forall h. Lens' (Storage h) Int
cursor) (Storage h
s Storage h
-> Getting
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
     (Storage h)
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall s a. s -> Getting a s a -> a
^. Getting
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
  (Storage h)
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
forall h h.
Lens
  (Storage h)
  (Storage h)
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
events)

-- Get events from memory buffer and disk buffer.
getEvents
  :: Buffered h
  => PrimMonad (StorableMonad h)
  => State h
  -> StorableMonad h [StorableEvent h]
getEvents :: State h -> StorableMonad h [StorableEvent h]
getEvents State h
s = do
  [StorableEvent h]
memoryEs <- Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall h.
Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
getMemoryEvents (State h
s State h -> Getting (Storage h) (State h) (Storage h) -> Storage h
forall s a. s -> Getting a s a -> a
^. Getting (Storage h) (State h) (Storage h)
forall h. Lens' (State h) (Storage h)
storage)
              MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> (MVector (PrimState (StorableMonad h)) (StorableEvent h)
    -> StorableMonad h (Vector (StorableEvent h)))
-> StorableMonad h (Vector (StorableEvent h))
forall a b. a -> (a -> b) -> b
& MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> StorableMonad h (Vector (StorableEvent h))
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.freeze StorableMonad h (Vector (StorableEvent h))
-> (Vector (StorableEvent h) -> [StorableEvent h])
-> StorableMonad h [StorableEvent h]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> Vector (StorableEvent h) -> [StorableEvent h]
forall a. Vector a -> [a]
V.toList
  [StorableEvent h]
diskEs   <- h -> StorableMonad h [StorableEvent h]
forall h. Buffered h => h -> StorableMonad h [StorableEvent h]
getStoredEvents (State h
s State h -> Getting h (State h) h -> h
forall s a. s -> Getting a s a -> a
^. Getting h (State h) h
forall h. Lens' (State h) h
handle)
  [StorableEvent h] -> StorableMonad h [StorableEvent h]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([StorableEvent h] -> StorableMonad h [StorableEvent h])
-> [StorableEvent h] -> StorableMonad h [StorableEvent h]
forall a b. (a -> b) -> a -> b
$ [StorableEvent h]
diskEs [StorableEvent h] -> [StorableEvent h] -> [StorableEvent h]
forall a. [a] -> [a] -> [a]
++ [StorableEvent h]
memoryEs

insert
  :: Buffered h
  => PrimMonad (StorableMonad h)
  => StorableEvent h
  -> State h
  -> StorableMonad h (State h)
insert :: StorableEvent h -> State h -> StorableMonad h (State h)
insert StorableEvent h
e State h
s = do
  State h
state'   <- State h -> StorableMonad h (State h)
forall h.
(Buffered h, PrimMonad (StorableMonad h)) =>
State h -> StorableMonad h (State h)
flushBuffer State h
s
  Storage h
storage' <- StorableEvent h -> Storage h -> StorableMonad h (Storage h)
forall h.
PrimMonad (StorableMonad h) =>
StorableEvent h -> Storage h -> StorableMonad h (Storage h)
appendEvent StorableEvent h
e (State h
state' State h -> Getting (Storage h) (State h) (Storage h) -> Storage h
forall s a. s -> Getting a s a -> a
^. Getting (Storage h) (State h) (Storage h)
forall h. Lens' (State h) (Storage h)
storage)
  State h -> StorableMonad h (State h)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (State h -> StorableMonad h (State h))
-> State h -> StorableMonad h (State h)
forall a b. (a -> b) -> a -> b
$ State h
state' { _storage :: Storage h
_storage = Storage h
storage' }

appendEvent
  :: PrimMonad (StorableMonad h)
  => StorableEvent h
  -> Storage h
  -> StorableMonad h (Storage h)
appendEvent :: StorableEvent h -> Storage h -> StorableMonad h (Storage h)
appendEvent StorableEvent h
e Storage h
s = do
  let cr :: Int
cr = Storage h
s Storage h -> Getting Int (Storage h) Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int (Storage h) Int
forall h. Lens' (Storage h) Int
cursor
  MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> Int -> StorableEvent h -> StorableMonad h ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> Int -> a -> m ()
VM.write (Storage h
s Storage h
-> Getting
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
     (Storage h)
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall s a. s -> Getting a s a -> a
^. Getting
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
  (Storage h)
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
forall h h.
Lens
  (Storage h)
  (Storage h)
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
events) Int
cr StorableEvent h
e
  Storage h -> StorableMonad h (Storage h)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Storage h -> StorableMonad h (Storage h))
-> Storage h -> StorableMonad h (Storage h)
forall a b. (a -> b) -> a -> b
$ Storage h
s Storage h -> (Storage h -> Storage h) -> Storage h
forall a b. a -> (a -> b) -> b
& (Int -> Identity Int) -> Storage h -> Identity (Storage h)
forall h. Lens' (Storage h) Int
cursor ((Int -> Identity Int) -> Storage h -> Identity (Storage h))
-> (Int -> Int) -> Storage h -> Storage h
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)

flushBuffer
  :: Buffered h
  => PrimMonad (StorableMonad h)
  => State h
  -> StorableMonad h (State h)
flushBuffer :: State h -> StorableMonad h (State h)
flushBuffer State h
s = do
  let cr :: Int
cr = State h
s State h -> Getting Int (State h) Int -> Int
forall s a. s -> Getting a s a -> a
^. (Storage h -> Const Int (Storage h))
-> State h -> Const Int (State h)
forall h. Lens' (State h) (Storage h)
storage ((Storage h -> Const Int (Storage h))
 -> State h -> Const Int (State h))
-> ((Int -> Const Int Int) -> Storage h -> Const Int (Storage h))
-> Getting Int (State h) Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Const Int Int) -> Storage h -> Const Int (Storage h)
forall h. Lens' (Storage h) Int
cursor
      es :: MVector (PrimState (StorableMonad h)) (StorableEvent h)
es = Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall h.
Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
getMemoryEvents (Storage h
 -> MVector (PrimState (StorableMonad h)) (StorableEvent h))
-> Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall a b. (a -> b) -> a -> b
$ State h
s State h -> Getting (Storage h) (State h) (Storage h) -> Storage h
forall s a. s -> Getting a s a -> a
^. Getting (Storage h) (State h) (Storage h)
forall h. Lens' (State h) (Storage h)
storage
      mx :: Int
mx = State h
s State h -> Getting Int (State h) Int -> Int
forall s a. s -> Getting a s a -> a
^. (Config -> Const Int Config) -> State h -> Const Int (State h)
forall h. Lens' (State h) Config
config ((Config -> Const Int Config) -> State h -> Const Int (State h))
-> ((Int -> Const Int Int) -> Config -> Const Int Config)
-> Getting Int (State h) Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Const Int Int) -> Config -> Const Int Config
Iso' Config Int
memoryBufferSize
  if Int
mx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
cr
  then do
    Vector (StorableEvent h)
v  <- MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> StorableMonad h (Vector (StorableEvent h))
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.freeze MVector (PrimState (StorableMonad h)) (StorableEvent h)
es
    h
h' <- Vector (StorableEvent h) -> h -> StorableMonad h h
forall h (f :: * -> *).
(Buffered h, Foldable f) =>
f (StorableEvent h) -> h -> StorableMonad h h
persistToStorage Vector (StorableEvent h)
v (State h
s State h -> Getting h (State h) h -> h
forall s a. s -> Getting a s a -> a
^. Getting h (State h) h
forall h. Lens' (State h) h
handle)
    State h -> StorableMonad h (State h)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (State h -> StorableMonad h (State h))
-> State h -> StorableMonad h (State h)
forall a b. (a -> b) -> a -> b
$ State h
s State h -> (State h -> State h) -> State h
forall a b. a -> (a -> b) -> b
& (Storage h -> Identity (Storage h))
-> State h -> Identity (State h)
forall h. Lens' (State h) (Storage h)
storage ((Storage h -> Identity (Storage h))
 -> State h -> Identity (State h))
-> ((Int -> Identity Int) -> Storage h -> Identity (Storage h))
-> (Int -> Identity Int)
-> State h
-> Identity (State h)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Identity Int) -> Storage h -> Identity (Storage h)
forall h. Lens' (Storage h) Int
cursor ((Int -> Identity Int) -> State h -> Identity (State h))
-> Int -> State h -> State h
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int
0
             State h -> (State h -> State h) -> State h
forall a b. a -> (a -> b) -> b
& (h -> Identity h) -> State h -> Identity (State h)
forall h. Lens' (State h) h
handle ((h -> Identity h) -> State h -> Identity (State h))
-> h -> State h -> State h
forall s t a b. ASetter s t a b -> b -> s -> t
.~ h
h'
  else State h -> StorableMonad h (State h)
forall (f :: * -> *) a. Applicative f => a -> f a
pure State h
s

insertMany
  :: Foldable f
  => Buffered h
  => PrimMonad (StorableMonad h)
  => f (StorableEvent h)
  -> State h
  -> StorableMonad h (State h)
insertMany :: f (StorableEvent h) -> State h -> StorableMonad h (State h)
insertMany f (StorableEvent h)
es State h
s =
  (State h -> StorableEvent h -> StorableMonad h (State h))
-> State h -> f (StorableEvent h) -> StorableMonad h (State h)
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldlM (\State h
s' StorableEvent h
e -> StorableEvent h -> State h -> StorableMonad h (State h)
forall h.
(Buffered h, PrimMonad (StorableMonad h)) =>
StorableEvent h -> State h -> StorableMonad h (State h)
insert StorableEvent h
e State h
s') State h
s f (StorableEvent h)
es

rewind
  :: forall h.
     Rewindable h
  => HasPoint (StorableEvent h) (StorablePoint h)
  => PrimMonad (StorableMonad h)
  => Eq (StorablePoint h)
  => StorablePoint h
  -> State h
  -> StorableMonad h (Maybe (State h))
rewind :: StorablePoint h -> State h -> StorableMonad h (Maybe (State h))
rewind StorablePoint h
p State h
s = do
  Maybe (State h)
m' <- StorableMonad h (Maybe (State h))
rewindMemory
  Maybe h
h' <- StorablePoint h -> h -> StorableMonad h (Maybe h)
forall h.
Rewindable h =>
StorablePoint h -> h -> StorableMonad h (Maybe h)
rewindStorage StorablePoint h
p (State h
s State h -> Getting h (State h) h -> h
forall s a. s -> Getting a s a -> a
^. Getting h (State h) h
forall h. Lens' (State h) h
handle)
  -- The implementation here is a little non-trivial. If the rollback point is in memory
  -- then we don't need to rewind the disk. If we need to rewind to some point stored
  -- on disk, then the memory needs to be reset.
  Maybe (State h) -> StorableMonad h (Maybe (State h))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (State h) -> StorableMonad h (Maybe (State h)))
-> Maybe (State h) -> StorableMonad h (Maybe (State h))
forall a b. (a -> b) -> a -> b
$ Maybe (State h)
m' Maybe (State h) -> Maybe (State h) -> Maybe (State h)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> State h -> h -> State h
resetMemory State h
s (h -> State h) -> Maybe h -> Maybe (State h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe h
h'
  where
    rewindMemory :: StorableMonad h (Maybe (State h))
    rewindMemory :: StorableMonad h (Maybe (State h))
rewindMemory = do
      Vector (StorableEvent h)
v <- MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> StorableMonad h (Vector (StorableEvent h))
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.freeze (MVector (PrimState (StorableMonad h)) (StorableEvent h)
 -> StorableMonad h (Vector (StorableEvent h)))
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> StorableMonad h (Vector (StorableEvent h))
forall a b. (a -> b) -> a -> b
$ Int
-> Int
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall s a. Int -> Int -> MVector s a -> MVector s a
VM.slice Int
0 (State h
s State h -> Getting Int (State h) Int -> Int
forall s a. s -> Getting a s a -> a
^. (Storage h -> Const Int (Storage h))
-> State h -> Const Int (State h)
forall h. Lens' (State h) (Storage h)
storage ((Storage h -> Const Int (Storage h))
 -> State h -> Const Int (State h))
-> ((Int -> Const Int Int) -> Storage h -> Const Int (Storage h))
-> Getting Int (State h) Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Const Int Int) -> Storage h -> Const Int (Storage h)
forall h. Lens' (Storage h) Int
cursor) (State h
s State h
-> Getting
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
     (State h)
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall s a. s -> Getting a s a -> a
^. (Storage h
 -> Const
      (MVector (PrimState (StorableMonad h)) (StorableEvent h))
      (Storage h))
-> State h
-> Const
     (MVector (PrimState (StorableMonad h)) (StorableEvent h)) (State h)
forall h. Lens' (State h) (Storage h)
storage ((Storage h
  -> Const
       (MVector (PrimState (StorableMonad h)) (StorableEvent h))
       (Storage h))
 -> State h
 -> Const
      (MVector (PrimState (StorableMonad h)) (StorableEvent h))
      (State h))
-> ((MVector (PrimState (StorableMonad h)) (StorableEvent h)
     -> Const
          (MVector (PrimState (StorableMonad h)) (StorableEvent h))
          (MVector (PrimState (StorableMonad h)) (StorableEvent h)))
    -> Storage h
    -> Const
         (MVector (PrimState (StorableMonad h)) (StorableEvent h))
         (Storage h))
-> Getting
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
     (State h)
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MVector (PrimState (StorableMonad h)) (StorableEvent h)
 -> Const
      (MVector (PrimState (StorableMonad h)) (StorableEvent h))
      (MVector (PrimState (StorableMonad h)) (StorableEvent h)))
-> Storage h
-> Const
     (MVector (PrimState (StorableMonad h)) (StorableEvent h))
     (Storage h)
forall h h.
Lens
  (Storage h)
  (Storage h)
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
  (MVector (PrimState (StorableMonad h)) (StorableEvent h))
events)
      Maybe (State h) -> StorableMonad h (Maybe (State h))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (State h) -> StorableMonad h (Maybe (State h)))
-> Maybe (State h) -> StorableMonad h (Maybe (State h))
forall a b. (a -> b) -> a -> b
$ do
        Int
ix   <- (StorableEvent h -> Bool) -> Vector (StorableEvent h) -> Maybe Int
forall (v :: * -> *) a.
Vector v a =>
(a -> Bool) -> v a -> Maybe Int
VG.findIndex (\StorableEvent h
e -> StorableEvent h -> StorablePoint h
forall e p. HasPoint e p => e -> p
getPoint StorableEvent h
e StorablePoint h -> StorablePoint h -> Bool
forall a. Eq a => a -> a -> Bool
== StorablePoint h
p) Vector (StorableEvent h)
v
        State h -> Maybe (State h)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (State h -> Maybe (State h)) -> State h -> Maybe (State h)
forall a b. (a -> b) -> a -> b
$ State h
s State h -> (State h -> State h) -> State h
forall a b. a -> (a -> b) -> b
& (Storage h -> Identity (Storage h))
-> State h -> Identity (State h)
forall h. Lens' (State h) (Storage h)
storage ((Storage h -> Identity (Storage h))
 -> State h -> Identity (State h))
-> ((Int -> Identity Int) -> Storage h -> Identity (Storage h))
-> (Int -> Identity Int)
-> State h
-> Identity (State h)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Identity Int) -> Storage h -> Identity (Storage h)
forall h. Lens' (Storage h) Int
cursor ((Int -> Identity Int) -> State h -> Identity (State h))
-> Int -> State h -> State h
forall s t a b. ASetter s t a b -> b -> s -> t
.~ (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    resetMemory :: State h -> h -> State h
    resetMemory :: State h -> h -> State h
resetMemory State h
s' h
h =
      State h
s' State h -> (State h -> State h) -> State h
forall a b. a -> (a -> b) -> b
& (h -> Identity h) -> State h -> Identity (State h)
forall h. Lens' (State h) h
handle  ((h -> Identity h) -> State h -> Identity (State h))
-> h -> State h -> State h
forall s t a b. ASetter s t a b -> b -> s -> t
.~ h
h
         State h -> (State h -> State h) -> State h
forall a b. a -> (a -> b) -> b
& (Storage h -> Identity (Storage h))
-> State h -> Identity (State h)
forall h. Lens' (State h) (Storage h)
storage ((Storage h -> Identity (Storage h))
 -> State h -> Identity (State h))
-> ((Int -> Identity Int) -> Storage h -> Identity (Storage h))
-> (Int -> Identity Int)
-> State h
-> Identity (State h)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Identity Int) -> Storage h -> Identity (Storage h)
forall h. Lens' (Storage h) Int
cursor ((Int -> Identity Int) -> State h -> Identity (State h))
-> Int -> State h -> State h
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int
0

resume
  :: Resumable h
  => State h
  -> StorableMonad h [StorablePoint h]
resume :: State h -> StorableMonad h [StorablePoint h]
resume State h
s = h -> StorableMonad h [StorablePoint h]
forall h. Resumable h => h -> StorableMonad h [StorablePoint h]
resumeFromStorage (State h
s State h -> Getting h (State h) h -> h
forall s a. s -> Getting a s a -> a
^. Getting h (State h) h
forall h. Lens' (State h) h
handle)

{-
   This function is a bit non-trivial to think about. The question it is trying to
   answer is: Which events are valid for a given query interval.

   The answer is quite a bit more complicated than it seems at first sight. We would
   like to select the latest event within the interval and everything that goes
   before it (so we get a proper history). If we can't find any event less than the
   end, that means that the query interval has filtered all existing events.

   This functionality is important, because it should also be implemented at the
   database level to filter the on-disk events.
-}
filterWithQueryInterval
  :: forall h.
     HasPoint (StorableEvent h) (StorablePoint h)
  => Ord (StorablePoint h)
  => QueryInterval (StorablePoint h)
  -> [StorableEvent h]
  -> [StorableEvent h]
filterWithQueryInterval :: QueryInterval (StorablePoint h)
-> [StorableEvent h] -> [StorableEvent h]
filterWithQueryInterval QueryInterval (StorablePoint h)
QEverything [StorableEvent h]
es = [StorableEvent h]
es
filterWithQueryInterval (QInterval StorablePoint h
start StorablePoint h
end) [StorableEvent h]
es =
  let es' :: [StorableEvent h]
es' = (StorableEvent h -> Bool) -> [StorableEvent h] -> [StorableEvent h]
forall a. (a -> Bool) -> [a] -> [a]
takeWhile ((StorablePoint h -> Bool) -> StorableEvent h -> Bool
withPoint (\StorablePoint h
p -> StorablePoint h
p StorablePoint h -> StorablePoint h -> Bool
forall a. Ord a => a -> a -> Bool
<= StorablePoint h
end)) [StorableEvent h]
es
   in if Bool -> Bool
not ([StorableEvent h] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [StorableEvent h]
es') Bool -> Bool -> Bool
&& (StorablePoint h -> Bool) -> StorableEvent h -> Bool
withPoint (\StorablePoint h
p -> StorablePoint h
p StorablePoint h -> StorablePoint h -> Bool
forall a. Ord a => a -> a -> Bool
>= StorablePoint h
start) ([StorableEvent h] -> StorableEvent h
forall a. [a] -> a
last [StorableEvent h]
es')
      then [StorableEvent h]
es'
      else []
  where
    withPoint :: (StorablePoint h -> Bool) -> StorableEvent h -> Bool
    withPoint :: (StorablePoint h -> Bool) -> StorableEvent h -> Bool
withPoint StorablePoint h -> Bool
f StorableEvent h
e = let p :: StorablePoint h
p = StorableEvent h -> StorablePoint h
forall e p. HasPoint e p => e -> p
getPoint StorableEvent h
e in StorablePoint h -> Bool
f StorablePoint h
p

query
  :: HasPoint (StorableEvent h) (StorablePoint h)
  => Ord (StorablePoint h)
  => Queryable h
  => PrimMonad (StorableMonad h)
  => QueryInterval (StorablePoint h)
  -> State h
  -> StorableQuery h
  -> StorableMonad h (StorableResult h)
query :: QueryInterval (StorablePoint h)
-> State h -> StorableQuery h -> StorableMonad h (StorableResult h)
query QueryInterval (StorablePoint h)
qi State h
s StorableQuery h
q = do
  [StorableEvent h]
es  <- Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
forall h.
Storage h
-> MVector (PrimState (StorableMonad h)) (StorableEvent h)
getMemoryEvents (State h
s State h -> Getting (Storage h) (State h) (Storage h) -> Storage h
forall s a. s -> Getting a s a -> a
^. Getting (Storage h) (State h) (Storage h)
forall h. Lens' (State h) (Storage h)
storage) MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> (MVector (PrimState (StorableMonad h)) (StorableEvent h)
    -> StorableMonad h (Vector (StorableEvent h)))
-> StorableMonad h (Vector (StorableEvent h))
forall a b. a -> (a -> b) -> b
& MVector (PrimState (StorableMonad h)) (StorableEvent h)
-> StorableMonad h (Vector (StorableEvent h))
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.freeze StorableMonad h (Vector (StorableEvent h))
-> (Vector (StorableEvent h) -> [StorableEvent h])
-> StorableMonad h [StorableEvent h]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> Vector (StorableEvent h) -> [StorableEvent h]
forall a. Vector a -> [a]
V.toList
  QueryInterval (StorablePoint h)
-> [StorableEvent h]
-> h
-> StorableQuery h
-> StorableMonad h (StorableResult h)
forall h (f :: * -> *).
(Queryable h, Foldable f) =>
QueryInterval (StorablePoint h)
-> f (StorableEvent h)
-> h
-> StorableQuery h
-> StorableMonad h (StorableResult h)
queryStorage QueryInterval (StorablePoint h)
qi (QueryInterval (StorablePoint h)
-> [StorableEvent h] -> [StorableEvent h]
forall h.
(HasPoint (StorableEvent h) (StorablePoint h),
 Ord (StorablePoint h)) =>
QueryInterval (StorablePoint h)
-> [StorableEvent h] -> [StorableEvent h]
filterWithQueryInterval QueryInterval (StorablePoint h)
qi [StorableEvent h]
es) (State h
s State h -> Getting h (State h) h -> h
forall s a. s -> Getting a s a -> a
^. Getting h (State h) h
forall h. Lens' (State h) h
handle) StorableQuery h
q