View Source Xandra.Cluster (Xandra v0.19.0)

Connection to a Cassandra cluster.

This module is a "proxy" connection with support for connecting to multiple nodes in a Cassandra cluster and executing queries on such nodes based on a given policy.

Usage

This module manages pools of connections to different nodes in a Cassandra cluster. Each pool is a pool of Xandra connections to a specific node.

The API provided by this module mirrors the API provided by the Xandra module. Queries executed through this module will be "routed" to nodes in the provided list of nodes based on a policy. See the "Load balancing policies" section.

Regardless of the underlying pool, Xandra.Cluster will establish one extra connection to a node in the cluster for internal purposes. We refer to this connection as the control connection.

Here is an example of how one could use Xandra.Cluster to connect to a cluster:

Xandra.Cluster.start_link(
  nodes: ["cassandra1.example.net", "cassandra2.example.net"],
  pool_size: 10,
)

The code above will establish a pool of ten connections to each of the nodes specified in :nodes, plus one extra connection used for internal purposes, for a total of twenty-one connections going out of the machine.

Child Specification

Xandra.Cluster implements a child_spec/1 function, so it can be used as a child under a supervisor:

children = [
  # ...,
  {Xandra.Cluster, nodes: ["cassandra-seed.example.net"]}
]

Contact Points and Cluster Discovery

Xandra.Cluster auto-discovers peer nodes in the cluster, by using the system.peers built-in Cassandra table. Once Xandra discovers peers, it opens a pool of connections to a subset of the peers based on the chosen load-balancing policy (see below).

The :nodes option in start_link/1 specifies the contact points. The contact points are used to discover the rest of the nodes in the cluster. It's generally a good idea to provide multiple contacts points, so that if some of those are unreachable, the others can be used to discover the rest of the cluster. Xandra.Cluster tries to connect to contact points in the order they are specified in the :nodes option, initially ignoring the chosen load-balancing policy. Once a connection is established, then that contact point is used to discover the rest of the cluster and open connection pools according to the load-balancing policy.

Xandra also refreshes the cluster topology periodically. See the :refresh_topology_interval option in start_link/1.

Load-balancing Policies

Xandra.Cluster uses customizable "load-balancing policies" to manage nodes in the cluster. A load-balancing policy is a module that implements the Xandra.Cluster.LoadBalancing behaviour. Xandra uses load-balancing policies for these purposes:

  • Choosing which node to execute a query on
  • Choosing which nodes to open pools of connections to (see the :target_pools option in start_link/1)
  • Choosing which node the control connection connects to (or re-connects to in case of disconnections)

Xandra ships with the following built-in load-balancing policies:

Disconnections and Reconnections

Xandra.Cluster also supports nodes disconnecting and reconnecting: if Xandra detects one of the nodes in :nodes going down, it will not execute queries against it anymore, but will start executing queries on it as soon as it detects such node is back up.

If all specified nodes happen to be down when a query is executed, a Xandra.ConnectionError with reason {:cluster, :not_connected} will be returned.

Telemetry

Xandra.Cluster emits several Telemetry events to help you log, instrument, and debug your application. See the Telemetry Events page in the guides for a comprehensive list of the events that Xandra emits.

Summary

Types

A Xandra cluster.

Cluster-specific options for start_link/1.

Functions

Returns a list of hosts that the cluster has outgoing connections to.

Same as execute/4 but with optional arguments.

Executes a query on a node in the cluster.

Same as execute/3 but returns the result directly or raises in case of errors.

Same as execute/4 but returns the result directly or raises in case of errors.

Same as prepare/3 but raises in case of errors.

Runs a function with a given connection.

Starts connections to a cluster.

Synchronously stops the given cluster with the given reason.

Types

@type cluster() :: GenServer.server()

A Xandra cluster.

Link to this type

start_option()

View Source (since 0.15.0)
@type start_option() ::
  {:nodes, [term()]}
  | {:load_balancing, term() | {module(), [term()]}}
  | {:autodiscovery, boolean()}
  | {:autodiscovered_nodes_port, 0..65535}
  | {:refresh_topology_interval, timeout()}
  | {:target_pools, pos_integer()}
  | {:name, term()}
  | {:sync_connect, timeout() | term()}
  | {:queue_checkouts_before_connecting, keyword()}
  | {:pool_size, pos_integer()}
  | {:debug, term()}
  | {:spawn_opt, term()}
  | {:hibernate_after, term()}
  | {:xandra_module, atom()}
  | {:control_connection_module, atom()}
  | {:test_discovered_hosts, term()}

Cluster-specific options for start_link/1.

Some of these options are internal and not part of the public API. Only use the options explicitly documented in start_link/1.

Functions

Link to this function

connected_hosts(cluster)

View Source (since 0.18.0)
@spec connected_hosts(cluster()) :: [Xandra.Cluster.Host.t()]

Returns a list of hosts that the cluster has outgoing connections to.

Link to this function

execute(cluster, query, params_or_options \\ [])

View Source
@spec execute(cluster(), Xandra.statement() | Xandra.Prepared.t(), Xandra.values()) ::
  {:ok, Xandra.result()} | {:error, Xandra.error()}
@spec execute(cluster(), Xandra.Batch.t(), keyword()) ::
  {:ok, Xandra.Void.t()} | {:error, Xandra.error()}

Same as execute/4 but with optional arguments.

Link to this function

execute(cluster, query, params, options)

View Source
@spec execute(
  cluster(),
  Xandra.statement() | Xandra.Prepared.t(),
  Xandra.values(),
  keyword()
) ::
  {:ok, Xandra.result()} | {:error, Xandra.error()}
@spec execute(
  cluster(),
  Xandra.statement() | Xandra.Prepared.t(),
  Xandra.values(),
  keyword()
) ::
  Xandra.result()

Executes a query on a node in the cluster.

This function executes a query on a node in the cluster. The node is chosen based on the load-balancing policy given in start_link/1.

Supports the same options as Xandra.execute/4. In particular, the :retry_strategy option is cluster-aware, meaning that queries are retried on possibly different nodes in the cluster.

Link to this function

execute!(cluster, query, params_or_options \\ [])

View Source
@spec execute!(cluster(), Xandra.statement() | Xandra.Prepared.t(), Xandra.values()) ::
  Xandra.result()
@spec execute!(cluster(), Xandra.Batch.t(), keyword()) :: Xandra.Void.t()

Same as execute/3 but returns the result directly or raises in case of errors.

Link to this function

execute!(cluster, query, params, options)

View Source

Same as execute/4 but returns the result directly or raises in case of errors.

Link to this function

prepare(cluster, statement, options \\ [])

View Source
@spec prepare(cluster(), Xandra.statement(), keyword()) ::
  {:ok, Xandra.Prepared.t()} | {:error, Xandra.error()}

Same as Xandra.prepare/3.

Preparing a query through Xandra.Cluster will prepare it only on one node, according to the load-balancing policy chosen in start_link/1. To prepare and execute a query on the same node, you could use run/3:

Xandra.Cluster.run(cluster, fn conn ->
  # "conn" is the pool of connections for a specific node.
  prepared = Xandra.prepare!(conn, "SELECT * FROM system.local")
  Xandra.execute!(conn, prepared, _params = [])
end)

Thanks to the prepared query cache, we can always reprepare the query and execute it because after the first time (on each node) the prepared query will be fetched from the cache. However, if a prepared query is unknown on a node, Xandra will prepare it on that node on the fly, so we can simply do this as well:

prepared = Xandra.Cluster.prepare!(cluster, "SELECT * FROM system.local")
Xandra.Cluster.execute!(cluster, prepared, _params = [])

Note that this goes through the cluster twice, so there's a high chance that the query will be prepared on one node and then executed on another node. This is however useful if you want to use the :retry_strategy option in execute!/4: in the run/3 example above, if you use :retry_strategy with Xandra.execute!/3, the query will be retried on the same pool of connections to the same node. execute!/4 will retry queries going through the cluster again instead.

Link to this function

prepare!(cluster, statement, options \\ [])

View Source
@spec prepare!(cluster(), Xandra.statement(), keyword()) :: Xandra.Prepared.t()

Same as prepare/3 but raises in case of errors.

If the function is successful, the prepared query is returned directly instead of in an {:ok, prepared} tuple like in prepare/3.

Link to this function

run(cluster, options \\ [], fun)

View Source
@spec run(cluster(), keyword(), (Xandra.conn() -> result)) :: result when result: var

Runs a function with a given connection.

The connection that is passed to fun is a Xandra connection, not a cluster. This means that you should call Xandra functions on it. Since the connection is a single connection, it means that it's a connection to a specific node, so you can do things like prepare a query and then execute it because you can be sure it's prepared on the same node where you're executing it.

Examples

query = "SELECT * FROM system_schema.keyspaces"

Xandra.Cluster.run(cluster, fn conn ->
  prepared = Xandra.prepare!(conn, query)
  Xandra.execute!(conn, prepared, _params = [])
end)
@spec start_link([option]) :: GenServer.on_start()
when option: Xandra.start_option() | start_option()

Starts connections to a cluster.

Options

This function accepts all options accepted by Xandra.start_link/1 and and forwards them to each underlying connection or pool of connections. The following options are specific to this function:

  • :nodes (list of String.t/0) - A list of nodes to use as contact points when setting up the cluster. Each node in this list must be a hostname ("cassandra.example.net"), IPv4 ("192.168.0.100"), or IPv6 ("16:64:c8:0:2c:58:5c:c7") address. An optional port can be specified by including :<port> after the address, such as "cassandra.example.net:9876". See the Contact points and cluster discovery section in the module documentation. The default value is ["127.0.0.1"].

  • :load_balancing ({module(), term()} or :random) - Load balancing "policy". See the Load balancing policies section in the module documentation. The policy must be expressed as a {module, options} tuple, where module is a module that implements the Xandra.Cluster.LoadBalancingPolicy behaviour, and options is any term that is passed to the Xandra.Cluster.LoadBalancingPolicy.init/1 callback. This option changed in v0.15.0. Before v0.15.0, the only supported values were :priority and :random. :random is deprecated in favor of using {Xandra.Cluster.LoadBalancingPolicy.Random, []}. :priority has been removed. The default value is :random.

  • :autodiscovery (boolean/0) - (deprecated) Whether to enable autodiscovery. Since v0.15.0, this option is deprecated and autodiscovery is always enabled.

  • :autodiscovered_nodes_port (:inet.port_number/0) - The port to use when connecting to autodiscovered nodes. Cassandra does not advertise the port of nodes when discovering them, so you'll need to specify one explicitly. This might get fixed in future Cassandra versions. The default value is 9042.

  • :refresh_topology_interval (timeout/0) - The interval at which Xandra will refresh the cluster topology by querying the control connection to discover peers. When the connection refreshes the topology, it will also start and stop pools for new and removed nodes, effectively "syncing" with the cluster. Available since v0.15.0. The default value is 300000.

  • :target_pools (pos_integer/0) - The number of nodes to start pools to. Each pool will use the :pool_size option to determine how many single connections to open to that node. This number is a target number, which means that sometimes there might not be enough nodes to start this many pools. Xandra won't ever start more than :target_pools pools. Available since v0.15.0. The default value is 2.

  • :name (term/0) - The name to register this cluster under. Follows the name registration rules of GenServer.

  • :sync_connect - Whether to wait for at least one connection to a node in the cluster to be established before returning from start_link/1. If false, connecting is async, which means that even if start_link/1 returns {:ok, pid}, that's the PID of the cluster process, which has not necessarily established any connections yet. If this option is an integer or :infinity (that is, a term of type timeout/0), then this function only returns when at least one node connection is established. If the timeout expires, this function returns {:error, :sync_connect_timeout}. Available since v0.16.0.

    This is only useful in rare cases when you want to make sure that the has connected at least once before returning from start_link/1. This is fragile though, because the cluster could connect once and then drop connections right away, so this doesn't mean that the cluster is connected, but rather that it connected at least once. This is useful, for example, in test suites where you're not worried about resiliency but rather race conditions. In most cases, the :queue_checkouts_before_connecting option is what you want.

    The default value is false.

  • :queue_checkouts_before_connecting (keyword/0) - Controls how to handle checkouts that go through the cluster before the cluster is able to establish a connection to any node. Whenever you run a cluster function, the cluster checks out a connection from one of the connected nodes and executes the request on that connection. However, if you try to run any cluster function before the cluster connects to any of the nodes, you'll likely get Xandra.ConnectionErrors with reason {:cluster, :not_connected}. This is because the cluster needs to establish at least one connection to one node before it can execute requests. This option addresses this issue by queueing "checkout requests" until the cluster establishes a connection to a node. Once the connection is established, the cluster starts to hand over connections. If you want to disable this behavior, set :max_size to 0. Available since v0.18.0. This option supports the following sub-options: The default value is [].

    • :max_size (non_neg_integer/0) - The number of checkouts to queue in the cluster and flush as soon as a connection is established. The default value is 100.

    • :timeout (timeout/0) - How long to hold on to checkout requests for. When this timeout expires, all requests are dropped and a connection error is returned to each caller. The default value is 5000.

  • :pool_size (pos_integer/0) - The number of connections to open to each node in the cluster. Available since v0.18.0. The default value is 1.

  • :debug (term/0) - Same as the :debug option in GenServer.start_link/3. Available since v0.18.0.

  • :spawn_opt (term/0) - Same as the :spawn_opt option in GenServer.start_link/3. Available since v0.18.0.

  • :hibernate_after (term/0) - Same as the :hibernate_after option in GenServer.start_link/3. Available since v0.18.0.

Control connection

A Xandra.Cluster starts one additional "control connection" to one of the nodes in the cluster. This could be a node in the given :nodes (a contact point) or a discovered peer in the cluster. See the Contact points and cluster discovery section in the module documentation.

Examples

Starting a Xandra cluster using two nodes as the contact points:

{:ok, cluster} =
  Xandra.Cluster.start_link(nodes: ["cassandra1.example.net", "cassandra2.example.net"])

Starting a pool of five connections to each node in the same cluster as the given contact point:

{:ok, cluster} =
  Xandra.Cluster.start_link(nodes: ["cassandra-seed.example.net"], pool_size: 5)

Passing options down to each connection:

{:ok, cluster} =
  Xandra.Cluster.start_link(
    nodes: ["cassandra.example.net"],
    keyspace: "my_keyspace"
  )
Link to this function

stop(cluster, reason \\ :normal, timeout \\ :infinity)

View Source (since 0.15.0)
@spec stop(cluster(), term(), timeout()) :: :ok

Synchronously stops the given cluster with the given reason.

Waits timeout milliseconds for the cluster to stop before aborting and exiting.

Link to this function

stream_pages!(cluster, query, params, options \\ [])

View Source
@spec stream_pages!(
  cluster(),
  Xandra.statement() | Xandra.Prepared.t(),
  Xandra.values(),
  keyword()
) :: Enumerable.t()

Returns a stream of pages.

When streaming pages through a cluster, the streaming is done from a single node, that is, this function just calls out to Xandra.stream_pages!/4 after choosing a node appropriately.

All options are forwarded to Xandra.stream_pages!/4, including retrying options.