Implementing custom load balancing policies
To implement a load balancing policy, you must implement all of the methods
specified in Cassandra::LoadBalancing::Policy
.
Currently, load balancing policies are required to be thread-safe.
The object returned from the plan
method must implement all methods of
Cassandra::LoadBalancing::Plan
Plan will be accessed from multiple threads, but never in parallel and it doesn’t have to be thread-safe.
Background
- Given
- a running cassandra cluster with schema:
CREATE KEYSPACE simplex WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}; CREATE TABLE simplex.songs ( id uuid PRIMARY KEY, title text, album text, artist text, tags set<text>, data blob ); INSERT INTO simplex.songs (id, title, album, artist, tags) VALUES ( 756716f7-2e54-4715-9f00-91dcbea6cf50, 'La Petite Tonkinoise', 'Bye Bye Blackbird', 'Joséphine Baker', {'jazz', '2013'}) ; INSERT INTO simplex.songs (id, title, album, artist, tags) VALUES ( f6071e72-48ec-4fcb-bf3e-379c8a696488, 'Die Mösch', 'In Gold', 'Willi Ostermann', {'kölsch', '1996', 'birds'} ); INSERT INTO simplex.songs (id, title, album, artist, tags) VALUES ( fbdf82ed-0063-4796-9c7c-a3d4f47b4b25, 'Memo From Turner', 'Performance', 'Mick Jager', {'soundtrack', '1991'} );
A policy to ignore a certain keyspace
- Given
- a file named “ignoring_keyspace_policy.rb” with:
class IgnoringKeyspacePolicy class Plan def has_next? false end def next nil end end def initialize(keyspace_to_ignore, original_policy) @keyspace = keyspace_to_ignore @policy = original_policy end def setup(cluster) end def teardown(cluster) end def plan(keyspace, statement, options) if @keyspace == keyspace Plan.new else @policy.plan(keyspace, statement, options) end end def distance(host) @policy.distance(host) end def host_found(host) @policy.host_found(host) end def host_lost(host) @policy.host_lost(host) end def host_up(host) @policy.host_up(host) end def host_down(host) @policy.host_down(host) end end
- And
- the following example:
require 'cassandra' require 'ignoring_keyspace_policy' policy = IgnoringKeyspacePolicy.new('simplex', Cassandra::LoadBalancing::Policies::RoundRobin.new) cluster = Cassandra.cluster(load_balancing_policy: policy) session = cluster.connect('simplex') begin session.execute("SELECT * FROM songs") puts "failure" rescue Cassandra::Errors::NoHostsAvailable puts "success" end
- When
- it is executed
- Then
- its output should contain:
success
A policy to ignore certain hosts
- Given
- a file named “blacklist_policy.rb” with:
class BlackListPolicy def initialize(ips_to_ignore, original_policy) @ips = ::Set.new @policy = original_policy ips_to_ignore.each do |ip| case ip when ::IPAddr @ips << ip when ::String @ips << ::IPAddr.new(ip) end end end def setup(cluster) end def teardown(cluster) end def plan(keyspace, statement, options) @policy.plan(keyspace, statement, options) end def distance(host) @policy.distance(host) end def host_found(host) @policy.host_found(host) unless @ips.include?(host.ip) end def host_lost(host) @policy.host_lost(host) unless @ips.include?(host.ip) end def host_up(host) @policy.host_up(host) unless @ips.include?(host.ip) end def host_down(host) @policy.host_down(host) unless @ips.include?(host.ip) end end
- And
- the following example:
require 'cassandra' require 'blacklist_policy' policy = BlackListPolicy.new(['127.0.0.2', '127.0.0.3'], Cassandra::LoadBalancing::Policies::RoundRobin.new) cluster = Cassandra.cluster(load_balancing_policy: policy) session = cluster.connect('simplex') host_ips = cluster.hosts.map(&:ip).sort coordinator_ips = 3.times.map do info = session.execute("SELECT * FROM songs").execution_info info.hosts.last.ip end puts "Cluster hosts:" puts host_ips puts "" puts "Hosts used:" puts coordinator_ips.sort.uniq
- When
- it is executed
- Then
- its output should contain:
Cluster hosts: 127.0.0.1 127.0.0.2 127.0.0.3 Hosts used: 127.0.0.1