Token-aware Load Balancing Policy

Token-aware policy is used to reduce network hops whenever possible by sending requests directly to the node that owns the data. Token-aware policy acts as a filter, wrapping another load balancing policy.

Token-aware policy uses schema metadata available in the cluster to determine the right partitioners and replication strategies for a given keyspace and locate replicas for a given statement.

In case replica node(s) cannot be found or reached, this policy falls back onto the wrapped policy plan.

Background

Given
a running cassandra cluster in 2 datacenters with 2 nodes in each
And
the following schema:
CREATE KEYSPACE simplex WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1};
CREATE TABLE simplex.songs (
  id uuid PRIMARY KEY,
  title text,
  album text,
  artist text,
  tags set<text>,
  data blob
);
INSERT INTO simplex.songs (id, title, album, artist, tags)
VALUES (
   756716f7-2e54-4715-9f00-91dcbea6cf50,
   'La Petite Tonkinoise',
   'Bye Bye Blackbird',
   'Joséphine Baker',
   {'jazz', '2013'})
;
INSERT INTO simplex.songs (id, title, album, artist, tags)
VALUES (
   f6071e72-48ec-4fcb-bf3e-379c8a696488,
   'Die Mösch',
   'In Gold',
   'Willi Ostermann',
   {'kölsch', '1996', 'birds'}
);
INSERT INTO simplex.songs (id, title, album, artist, tags)
VALUES (
   fbdf82ed-0063-4796-9c7c-a3d4f47b4b25,
   'Memo From Turner',
   'Performance',
   'Mick Jager',
   {'soundtrack', '1991'}
);
CREATE TABLE simplex.playlists (
  id uuid,
  title text,
  album text,
  artist text,
  song_id uuid,
  PRIMARY KEY ((id, title), album, artist)
);
INSERT INTO simplex.playlists (id, song_id, title, album, artist)
VALUES (
   2cc9ccb7-6221-4ccb-8387-f22b6a1b354d,
   756716f7-2e54-4715-9f00-91dcbea6cf50,
   'La Petite Tonkinoise',
   'Bye Bye Blackbird',
   'Joséphine Baker'
);
INSERT INTO simplex.playlists (id, song_id, title, album, artist)
VALUES (
   2cc9ccb7-6221-4ccb-8387-f22b6a1b354d,
   f6071e72-48ec-4fcb-bf3e-379c8a696488,
   'Die Mösch',
   'In Gold',
   'Willi Ostermann'
);
INSERT INTO simplex.playlists (id, song_id, title, album, artist)
VALUES (
   3fd2bedf-a8c8-455a-a462-0cd3a4353c54,
   fbdf82ed-0063-4796-9c7c-a3d4f47b4b25,
   'Memo From Turner',
   'Performance',
   'Mick Jager'
);
INSERT INTO simplex.playlists (id, song_id, title, album, artist)
VALUES (
   3fd2bedf-a8c8-455a-a462-0cd3a4353c54,
   756716f7-2e54-4715-9f00-91dcbea6cf50,
   'La Petite Tonkinoise',
   'Bye Bye Blackbird',
   'Joséphine Baker'
);

Requests are routed to the primary replica

Given
the following example:
require 'cassandra'

base_policy = Cassandra::LoadBalancing::Policies::DCAwareRoundRobin.new('dc2')
policy      = Cassandra::LoadBalancing::Policies::TokenAware.new(base_policy)
cluster     = Cassandra.cluster(load_balancing_policy: policy)
session     = cluster.connect('simplex')
statement   = session.prepare("SELECT token(id) FROM songs WHERE id = ?")

[
  Cassandra::Uuid.new('756716f7-2e54-4715-9f00-91dcbea6cf50'),
  Cassandra::Uuid.new('f6071e72-48ec-4fcb-bf3e-379c8a696488'),
  Cassandra::Uuid.new('fbdf82ed-0063-4796-9c7c-a3d4f47b4b25')
].each do |uuid|
  result  = session.execute(statement, arguments: [uuid])
  replica = result.execution_info.hosts.first
  total   = result.execution_info.hosts.size
  puts "uuid=#{uuid} token=#{result.first.values.first} replica=#{replica.ip} total=#{total}"
end
When
it is executed
Then
its output should contain:
uuid=756716f7-2e54-4715-9f00-91dcbea6cf50 token=-4565826248849633211 replica=127.0.0.4 total=1
uuid=f6071e72-48ec-4fcb-bf3e-379c8a696488 token=-1176857621403111796 replica=127.0.0.4 total=1
uuid=fbdf82ed-0063-4796-9c7c-a3d4f47b4b25 token=2440231132048646025 replica=127.0.0.3 total=1

Requests are routed according to wrapped policy plan when primary replica is down

Given
the following example:
require 'cassandra'

base_policy = Cassandra::LoadBalancing::Policies::DCAwareRoundRobin.new('dc2')
policy      = Cassandra::LoadBalancing::Policies::TokenAware.new(base_policy)
cluster     = Cassandra.cluster(load_balancing_policy: policy)
session     = cluster.connect('simplex')
statement   = session.prepare("SELECT token(id) FROM songs WHERE id = ?")

[
  Cassandra::Uuid.new('756716f7-2e54-4715-9f00-91dcbea6cf50'),
  Cassandra::Uuid.new('f6071e72-48ec-4fcb-bf3e-379c8a696488'),
  Cassandra::Uuid.new('fbdf82ed-0063-4796-9c7c-a3d4f47b4b25')
].each do |uuid|
  result  = session.execute(statement, arguments: [uuid], consistency: :one)
  replica = result.execution_info.hosts.first
  total   = result.execution_info.hosts.size
  puts "uuid=#{uuid} token=#{result.first.values.first} replica=#{replica.ip} total=#{total}"
end
And
node 4 is stopped
When
it is executed
Then
its output should contain:
uuid=756716f7-2e54-4715-9f00-91dcbea6cf50 token=-4565826248849633211 replica=127.0.0.3 total=1
uuid=f6071e72-48ec-4fcb-bf3e-379c8a696488 token=-1176857621403111796 replica=127.0.0.3 total=1
uuid=fbdf82ed-0063-4796-9c7c-a3d4f47b4b25 token=2440231132048646025 replica=127.0.0.3 total=1

Requests with compound partition keys are routed to the primary replica

Given
the following example:
require 'cassandra'

policy    = Cassandra::LoadBalancing::Policies::DCAwareRoundRobin.new('dc2')
policy    = Cassandra::LoadBalancing::Policies::TokenAware.new(policy)
cluster   = Cassandra.cluster(load_balancing_policy: policy)
session   = cluster.connect('simplex')
statement = session.prepare("SELECT token(id, title) FROM playlists WHERE id = ? AND title = ?")

[
  [Cassandra::Uuid.new('2cc9ccb7-6221-4ccb-8387-f22b6a1b354d'), 'La Petite Tonkinoise'],
  [Cassandra::Uuid.new('2cc9ccb7-6221-4ccb-8387-f22b6a1b354d'), 'Die Mösch'],
  [Cassandra::Uuid.new('3fd2bedf-a8c8-455a-a462-0cd3a4353c54'), 'Memo From Turner'],
  [Cassandra::Uuid.new('3fd2bedf-a8c8-455a-a462-0cd3a4353c54'), 'La Petite Tonkinoise'],
].each do |arguments|
  result  = session.execute(statement, arguments: arguments)
  replica = result.execution_info.hosts.first
  total   = result.execution_info.hosts.size
  puts "uuid=#{arguments[0]} title=#{arguments[0]} token=#{result.first.values.first} replica=#{replica.ip} total=#{total}"
end
When
it is executed
Then
its output should contain:
uuid=2cc9ccb7-6221-4ccb-8387-f22b6a1b354d title=2cc9ccb7-6221-4ccb-8387-f22b6a1b354d token=6231549073425362204 replica=127.0.0.3 total=1
uuid=2cc9ccb7-6221-4ccb-8387-f22b6a1b354d title=2cc9ccb7-6221-4ccb-8387-f22b6a1b354d token=-115815985718975675 replica=127.0.0.4 total=1
uuid=3fd2bedf-a8c8-455a-a462-0cd3a4353c54 title=3fd2bedf-a8c8-455a-a462-0cd3a4353c54 token=-463065628644986368 replica=127.0.0.4 total=1
uuid=3fd2bedf-a8c8-455a-a462-0cd3a4353c54 title=3fd2bedf-a8c8-455a-a462-0cd3a4353c54 token=-8087998491924709995 replica=127.0.0.4 total=1

Requests with local consistencies are routed to a remote datacenter when primary replica is down

Given
the following example:
require 'cassandra'

max_remote_hosts_to_use = nil
use_remote_hosts_for_local_consistency = true
base_policy = Cassandra::LoadBalancing::Policies::DCAwareRoundRobin.new('dc2',
                                                                         max_remote_hosts_to_use,
                                                                         use_remote_hosts_for_local_consistency)
policy      = Cassandra::LoadBalancing::Policies::TokenAware.new(base_policy)
cluster     = Cassandra.cluster(load_balancing_policy: policy)
session     = cluster.connect('simplex')
statement   = session.prepare("SELECT token(id) FROM songs WHERE id = ?")

[
  Cassandra::Uuid.new('756716f7-2e54-4715-9f00-91dcbea6cf50'),
  Cassandra::Uuid.new('f6071e72-48ec-4fcb-bf3e-379c8a696488')
].each do |uuid|
  result  = session.execute(statement, arguments: [uuid], consistency: :local_one)
  replica = result.execution_info.hosts.first
  total   = result.execution_info.hosts.size
  puts "uuid=#{uuid} token=#{result.first.values.first} replica=#{replica.ip} total=#{total}"
end
And
node 3 is stopped
And
node 4 is stopped
When
it is executed
Then
its output should contain:
uuid=756716f7-2e54-4715-9f00-91dcbea6cf50 token=-4565826248849633211 replica=127.0.0.2 total=1
uuid=f6071e72-48ec-4fcb-bf3e-379c8a696488 token=-1176857621403111796 replica=127.0.0.1 total=1