bolty-streamly: Streamly streaming interface for bolty Neo4j driver

[ apache, database, library ] [ Propose Tags ] [ Report a vulnerability ]

Streaming queries over Neo4j using streamly streams. Wraps bolty's query functions with streamly's Stream type for lazy record-by-record consumption.


[Skip to Readme]

Modules

[Index] [Quick Jump]

Flags

Manual Flags

NameDescriptionDefault
dev

Turn on -Werror and other developer flags

Disabled

Use -f <flag> to enable a flag, or -f -<flag> to disable that flag. More info

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

Versions [RSS] 0.1.0.0
Change log changelog.md
Dependencies base (>=4.18 && <5), bolty (>=0.1.0.2 && <0.2), packstream-bolt (>=0.1 && <0.2), streamly-core (>=0.2 && <0.3), text (>=2.0 && <2.2), unordered-containers (>=0.2 && <0.3), vector (>=0.13 && <0.14) [details]
Tested with ghc ==9.6.7 || ==9.8.4 || ==9.10.3 || ==9.12.3
License Apache-2.0
Copyright (c) 2023-2026 philippedev101
Author philippedev101
Maintainer philippedev101@gmail.com
Uploaded by philippedev101 at 2026-03-07T14:31:33Z
Category Database
Home page https://github.com/philippedev101/bolty-streamly#readme
Bug tracker https://github.com/philippedev101/bolty-streamly/issues
Source repo head: git clone https://github.com/philippedev101/bolty-streamly
Downloads 6 total (6 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs uploaded by user
Build status unknown [no reports yet]

Readme for bolty-streamly-0.1.0.0

[back to package description]

bolty-streamly

Streaming interface for the bolty Neo4j driver, built on streamly.

Why streaming?

bolty's standard query functions buffer the entire result set into a Vector before returning. For large result sets (millions of rows, graph traversals, data exports), this can use a lot of memory.

bolty-streamly yields records one at a time as they arrive from the server, giving you constant-memory consumption regardless of result set size. Records are pulled from Neo4j in batches via the BOLT protocol's PULL mechanism, but exposed as a single Stream IO — you never have to think about batching.

Quick start

import qualified Database.Bolty as Bolt import qualified Database.Bolty.Streamly as BoltS import qualified Streamly.Data.Stream as Stream import qualified Streamly.Data.Fold as Fold import Data.Default (def) main :: IO () main = do let cfg = def{ Bolt.scheme = Bolt.Basic "neo4j" "password", Bolt.use_tls = False } case Bolt.validateConfig cfg of Failure _ -> error "bad config" Success vc -> do pool <- Bolt.createPool vc Bolt.defaultPoolConfig let s = BoltS.poolStream pool "MATCH (n:Person) RETURN n.name AS name" count <- Stream.fold Fold.length s putStrLn $ "Processed " <> show count <> " records" Bolt.destroyPool pool 

API overview

The module exposes four levels of streaming, each with variants for parameters (P) and typed decoding (As).

Streams from pool, routing, and session functions are ordinary values — you can store them, pass them around, and compose them freely. Connection lifetime is tied to the stream via bracketIO: the connection is acquired when consumption begins and released when the stream completes or errors.

Direct connection

Use when you manage the connection yourself:

-- Raw records queryStream :: Connection -> Text -> IO (Stream IO Record) queryStreamP :: Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record) -- Decoded records (throws DecodeError on failure) queryStreamAs :: RowDecoder a -> Connection -> Text -> IO (Stream IO a) queryStreamPAs :: RowDecoder a -> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a) 

Connection pool

Acquires a pooled connection when the stream is consumed and returns it when the stream finishes:

poolStream :: BoltPool -> Text -> Stream IO Record poolStreamP :: BoltPool -> Text -> HashMap Text Ps -> Stream IO Record poolStreamAs :: RowDecoder a -> BoltPool -> Text -> Stream IO a poolStreamPAs :: RowDecoder a -> BoltPool -> Text -> HashMap Text Ps -> Stream IO a 

Example:

pool <- Bolt.createPool vc Bolt.defaultPoolConfig let people = BoltS.poolStreamAs personDecoder pool "MATCH (p:Person) RETURN p.name, p.age" Stream.mapM_ print people Bolt.destroyPool pool 

Routing pool (clusters)

Routes queries to the appropriate cluster member based on access mode:

routingStream :: RoutingPool -> AccessMode -> Text -> Stream IO Record routingStreamP :: RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> Stream IO Record routingStreamAs :: RowDecoder a -> RoutingPool -> AccessMode -> Text -> Stream IO a routingStreamPAs :: RowDecoder a -> RoutingPool -> AccessMode -> Text -> HashMap Text Ps -> Stream IO a 

Example:

let results = BoltS.routingStreamAs decoder routingPool ReadAccess "MATCH (n) RETURN n" items <- Stream.fold Fold.toList results 

Session (causal consistency)

Runs streaming queries inside managed transactions with automatic bookmark tracking and read/write routing:

sessionReadStream :: Session -> Text -> Stream IO Record sessionWriteStream :: Session -> Text -> Stream IO Record sessionReadStreamAs :: RowDecoder a -> Session -> Text -> Stream IO a sessionWriteStreamAs :: RowDecoder a -> Session -> Text -> Stream IO a -- + P variants for parameters 

Example:

session <- Bolt.createSession pool Bolt.defaultSessionConfig -- Write some data Stream.fold Fold.drain $ BoltS.sessionWriteStream session "CREATE (p:Person {name: 'Alice'})" -- Read it back (guaranteed to see Alice via bookmarks) Stream.mapM_ print $ BoltS.sessionReadStreamAs personDecoder session "MATCH (p:Person) RETURN p.name, p.age" 

Low-level: pullStream

If you need to run a query with custom RUN parameters and then stream the PULL phase yourself:

pullStream :: Connection -> IO (Stream IO Record) 

This expects the connection to already be in Streaming or TXstreaming state (after a RUN has been acknowledged). It handles PULL batching and state transitions automatically.

Naming convention

Suffix Meaning
(none) No parameters, raw Record stream
P With parameters (HashMap Text Ps)
As Decoded via RowDecoder a, no parameters
PAs Decoded via RowDecoder a, with parameters

Supported GHC versions

9.6.7, 9.8.4, 9.10.3, 9.12.3

License

Apache-2.0