Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support protocol v3 #46

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ Common parameters:
migrations are run against. This option must be used
in conjuction with the -k option. This option is
ignored unless the -s option is provided.
--protocol-version PROTOCOL_VERSION
Protocol version used to connect to Cassandra.
-y, --assume-yes Automatically answer "yes" for all questions

migrate
Expand Down
6 changes: 5 additions & 1 deletion cassandra_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def main():
migrations are run against. This option must be used in
conjuction with the -k option. This option is ignored
unless the -s option is provided.""")
parser.add_argument('--protocol-version', type=int, default=None,
choices=range(1, 6),
help='Protocol version used to connect to Cassandra.')
parser.add_argument('-y', '--assume-yes', action='store_true',
help='Automatically answer "yes" for all questions')

Expand Down Expand Up @@ -139,7 +142,8 @@ def main():
user=opts.user, password=opts.password,
host_cert_path=opts.ssl_cert,
client_key_path=opts.ssl_client_private_key,
client_cert_path=opts.ssl_client_cert) as migrator:
client_cert_path=opts.ssl_client_cert,
protocol_version=opts.protocol_version) as migrator:
cmd_method = getattr(migrator, opts.action)
if not callable(cmd_method):
print('Error: invalid command', file=sys.stderr)
Expand Down
41 changes: 31 additions & 10 deletions cassandra_migrate/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
VALUES (%s, %s, %s, %s, %s, %s, toTimestamp(now())) IF NOT EXISTS
"""

CREATE_DB_VERSION_PROTOCOL_3 = """
INSERT INTO "{keyspace}"."{table}"
(id, version, name, content, checksum, state, applied_at)
VALUES (%s, %s, %s, %s, %s, %s, dateof(now())) IF NOT EXISTS
"""

FINALIZE_DB_VERSION = """
UPDATE "{keyspace}"."{table}" SET state = %s WHERE id = %s IF state = %s
"""
Expand Down Expand Up @@ -116,7 +122,8 @@ class Migrator(object):

def __init__(self, config, profile='dev', hosts=['127.0.0.1'], port=9042,
user=None, password=None, host_cert_path=None,
client_key_path=None, client_cert_path=None):
client_key_path=None, client_cert_path=None,
protocol_version=None):
self.config = config

try:
Expand All @@ -137,14 +144,21 @@ def __init__(self, config, profile='dev', hosts=['127.0.0.1'], port=9042,
else:
ssl_options = None

self.cluster = Cluster(
contact_points=hosts,
port=port,
auth_provider=auth_provider,
max_schema_agreement_wait=300,
control_connection_timeout=10,
connect_timeout=30,
ssl_options=ssl_options)
cluster_kwargs = {
"contact_points": hosts,
"port": port,
"auth_provider": auth_provider,
"max_schema_agreement_wait": 300,
"control_connection_timeout": 10,
"connect_timeout": 30,
"ssl_options": ssl_options,
}
# Cluster defaults `protocol_version` to `cluster._NOT_SET`
# Only pass it to `Cluster` if it has been set, otherwise let it use
# its default
if protocol_version is not None:
cluster_kwargs["protocol_version"] = protocol_version
self.cluster = Cluster(**cluster_kwargs)

self._session = None

Expand Down Expand Up @@ -377,8 +391,15 @@ def _create_version(self, version, migration):
version, migration))

version_id = uuid.uuid4()

# Cassandra versions below 2.2 use protocol 3 and use `dateof` instead
# of `toTimestamp`
if self.cluster.protocol_version <= 3:
statement = CREATE_DB_VERSION_PROTOCOL_3
else:
statement = CREATE_DB_VERSION
result = self._execute(
self._q(CREATE_DB_VERSION),
self._q(statement),
(version_id, version, migration.name, migration.content,
bytearray(migration.checksum), Migration.State.IN_PROGRESS))

Expand Down