Internal stream processor for SuperCache buffer queues.
This module bridges the internal message-passing queue (SuperCache.Internal.Queue)
with the caching layer (SuperCache). It creates a Stream that continuously
pulls items from a named queue and pushes them into the cache via SuperCache.put/1.
Design
The stream is built using Stream.resource/3:
- Accumulator - The queue name/atom passed to
create/1. - Next function - Calls
Queue.get/1to fetch a batch of items. - Cleanup function - No-op, as the queue lifecycle is managed externally.
The stream is consumed by make_stream_pipe/1, which pipes each item
through push/1 and runs the stream to completion.
Error Handling
- If
Queue.get/1returns{:error, :timeout}, the stream terminates gracefully. - If
SuperCache.put/1raises, the error is logged and the stream continues processing subsequent items (fault-tolerant design).
Warning
This is an internal module. Do not use it directly in application code.
Use SuperCache.Buffer or SuperCache.lazy_put/1 instead.
Summary
Types
Queue name or PID used as the stream accumulator.
Types
Functions
@spec create(queue_ref()) :: Enumerable.t()
Creates a Stream that continuously pulls items from the given queue.
The stream yields batches of items returned by Queue.get/1.
If the queue returns {:error, :timeout}, the stream halts.
Examples
stream = SuperCache.Internal.Stream.create(:my_buffer)
Stream.take(stream, 10) |> Enum.to_list()
@spec make_stream_pipe(Enumerable.t()) :: :ok
Consumes an enumerable and pushes each item into the cache.
Runs the stream to completion. Errors during push/1 are caught and logged
so that a single bad item does not halt the entire stream.
Examples
:my_buffer
|> SuperCache.Internal.Stream.create()
|> SuperCache.Internal.Stream.make_stream_pipe()
@spec push(tuple()) :: :ok
Pushes a single item into the SuperCache.
Wraps SuperCache.put/1 with error handling. If the put fails, the error
is logged but not raised, ensuring the stream continues processing.
Examples
SuperCache.Internal.Stream.push({:user, 1, "Alice"})
# => :ok