Neo4j.Stream (Neo4jEx v0.1.4)

View Source

Streaming interface for large Neo4j result sets.

This module provides a way to process large result sets without loading all data into memory at once. It uses Elixir's Stream.resource/3 to create a stream that fetches data in batches using Neo4j's SKIP/LIMIT pagination.

Summary

Functions

Creates a stream for large result sets.

Functions

run(driver, query, params \\ %{}, opts \\ [])

Creates a stream for large result sets.

Parameters

  • driver: Driver process
  • query: Cypher query string
  • params: Query parameters map (default: %{})
  • opts: Query options (default: [])

Options

  • :batch_size - Number of records to fetch at once (default: 1000)
  • :timeout - Query timeout in milliseconds (default: 30000)

Returns

Stream of records

Examples

# Basic streaming
driver
|> Neo4j.Stream.run("MATCH (n:Person) RETURN n")
|> Stream.map(fn record -> process_person(record) end)
|> Stream.run()

# With custom batch size
driver
|> Neo4j.Stream.run("MATCH (n:BigData) RETURN n", %{}, batch_size: 500)
|> Stream.chunk_every(100)
|> Enum.each(&batch_process/1)

# Memory-efficient aggregation
total = driver
|> Neo4j.Stream.run("MATCH (n:Transaction) RETURN n.amount")
|> Stream.map(fn record -> record |> get_field("n.amount") end)
|> Enum.sum()

run_with(driver, query, params \\ %{}, processor_fn, opts \\ [])

Stream with custom processing function.

Parameters

  • driver: Driver process
  • query: Cypher query string
  • params: Query parameters map (default: %{})
  • processor_fn: Function to process each record
  • opts: Query options (default: [])

Options

  • :batch_size - Number of records to fetch at once (default: 1000)
  • :timeout - Query timeout in milliseconds (default: 30000)

Returns

Stream of processed records