Skip to content

Commit

Permalink
Kafka Backup Restore tool
Browse files Browse the repository at this point in the history
This tool can restore Apache Kafka topic data from backups in Google
Cloud Storage stored by Aiven Kafka GCS Connector.

The tool assumes the correct topic configuration has been recreated,
and stores data in the original partitions in sequence.

While the tool cannot at the moment restore consumer group locations,
the tool outputs offset differences between the original and the new
cluster. This information can be used to adjust consumers to the
correct location.
  • Loading branch information
hnousiainen committed May 20, 2020
1 parent 7b05316 commit e21d03c
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 0 deletions.
10 changes: 10 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[flake8]
max-line-length = 125
ignore =
E123, # Closing brackets indent
E126, # Hanging indents
E129, # Visual indent
E501, # Max line length
E722, # bare-except
W503, # Breaks & binary operators
W504, # line break after binary operator (conflicts with our yapf style)
50 changes: 50 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[MASTER]
load-plugins=pylint_quotes

string-quote=double-avoid-escape
triple-quote=double
docstring-quote=double

[MESSAGES CONTROL]
disable=
abstract-method,
bad-continuation,
chained-comparison, # R1716: Simplify chained comparison between the operands
duplicate-code,
fixme,
invalid-name,
len-as-condition,
missing-docstring,
no-else-return,
no-else-raise,
no-self-use,
superfluous-parens,
too-few-public-methods,
too-many-ancestors,
too-many-arguments,
too-many-boolean-expressions,
too-many-branches,
too-many-instance-attributes,
too-many-lines,
too-many-locals,
too-many-nested-blocks,
too-many-public-methods,
too-many-statements,
ungrouped-imports,
unused-argument,
wrong-import-order,
line-too-long,
no-else-continue,
no-else-break,
import-outside-toplevel

[FORMAT]
max-line-length=125

[REPORTS]
output-format=text
reports=no
score=no

[TYPECHECK]
ignored-classes=responses
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
PYTHON ?= PYTHONDONTWRITEBYTECODE=1 python3
PYTHON_SOURCE_DIRS = kafka_restore

.PHONY: all
all:

.PHONY: pylint
pylint:
$(PYTHON) -m pylint --rcfile .pylintrc $(PYTHON_SOURCE_DIRS)

.PHONY: flake8
flake8:
$(PYTHON) -m flake8 kafka_restore $(PYTHON_SOURCE_DIRS)
13 changes: 13 additions & 0 deletions example_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"kafka": {
"kafka_url": "kafka-example.aivencloud.com:11397",
"ssl_ca_file": "ca.crt",
"ssl_access_certificate_file": "service.crt",
"ssl_access_key_file": "service.key"
},
"object_storage": {
"type": "gcs",
"bucket": "example-backup-bucket",
"credentials_file": "example-gcs-credentials.json"
}
}
Empty file added kafka_restore/__init__.py
Empty file.
151 changes: 151 additions & 0 deletions kafka_restore/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

from .object_storage.gcs import GCSProvider
from argparse import ArgumentParser
from tempfile import TemporaryDirectory

import codecs
import json
import kafka
import logging
import os
import re


class KafkaRestore:
def __init__(self, *, config):
self.log = logging.getLogger(self.__class__.__name__)
self.config = config

object_storage_config = self.config.get("object_storage", {})
object_storage_type = object_storage_config.get("type")
if object_storage_type == "gcs":
self.object_storage = GCSProvider(config=object_storage_config)
else:
raise ValueError(f"Unknown object storage type: {object_storage_type}")

kafka_config = self.config.get("kafka", {})

self.kafka_producer = kafka.KafkaProducer(
bootstrap_servers=kafka_config["kafka_url"],
security_protocol="SSL",
ssl_cafile=kafka_config["ssl_ca_file"],
ssl_certfile=kafka_config["ssl_access_certificate_file"],
ssl_keyfile=kafka_config["ssl_access_key_file"],
)

def list_topic_data_files(self, *, topic):
topic_re = re.compile(
(
r"(?P<topic>" + re.escape(topic) + r")"
r"-(?P<partition>[0-9]+)"
r"-(?P<offset>[0-9]+)"
)
)

topic_partition_files = {}

for item in self.object_storage.list_items():
matches = topic_re.match(item)
if matches:
partition = int(matches.group("partition"))
if partition not in topic_partition_files:
topic_partition_files[partition] = []
begin_offset = matches.group("offset")
record = {
"begin_offset": int(begin_offset),
"object_name": item,
}
topic_partition_files[partition].append(record)

for partition in topic_partition_files:
topic_partition_files[partition] = sorted(topic_partition_files[partition], key=lambda x: x["begin_offset"])

return topic_partition_files

def parse_record(self, record_line):
fields = record_line.split(",")
if fields[0]:
key = codecs.decode(codecs.encode(fields[0], "ascii"), "base64")
else:
key = None
if fields[1]:
value = codecs.decode(codecs.encode(fields[1], "ascii"), "base64")
else:
value = None
offset = int(fields[2])
if fields[3]:
timestamp = int(fields[3])
else:
timestamp = None

return key, value, offset, timestamp

def restore(self, *, topic):
topic_partition_files = self.list_topic_data_files(topic=topic)
partition_offset_records = {}

with TemporaryDirectory() as working_directory:
while True:
progress = False
for partition in topic_partition_files:
if topic_partition_files[partition]:
progress = True
object_name = topic_partition_files[partition][0]["object_name"]
local_name = f"{working_directory}/{topic}-{partition}"

self.object_storage.get_contents_to_file(object_name, local_name)

with open(local_name, "r") as fh:
nrecords = 0
for line in fh.readlines():
key, value, offset, timestamp = self.parse_record(line.strip())
future_record = self.kafka_producer.send(
topic,
partition=partition,
key=key,
value=value,
timestamp_ms=timestamp,
)
nrecords += 1
partition_offset_records[partition] = {
"last_original_offset": offset,
"last_produced_record": future_record,
}

self.log.info("Restored %d messages from object %r", nrecords, object_name)

os.unlink(local_name)
topic_partition_files[partition] = topic_partition_files[partition][1:]

if not progress:
self.kafka_producer.flush()
break

for partition in sorted(partition_offset_records):
self.log.info(
"Partition %d original offset %d new offset %d",
partition,
partition_offset_records[partition]["last_original_offset"],
partition_offset_records[partition]["last_produced_record"].get().offset,
)


def main():
logging.basicConfig(level=logging.INFO, format="%(name)-20s %(levelname)-8s %(message)s")

parser = ArgumentParser()
parser.add_argument("-c", "--config", required=True, help="Path to config file")
parser.add_argument("-t", "--topic", required=True, help="Topic name")
args = parser.parse_args()

with open(args.config) as fh:
restore_config = json.load(fh)

kafka_restore = KafkaRestore(config=restore_config)

kafka_restore.restore(topic=args.topic)


if __name__ == "__main__":
main()
Empty file.
61 changes: 61 additions & 0 deletions kafka_restore/object_storage/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import codecs
import datetime
import dateutil.parser
import dateutil.tz
import logging


class ObjectStorageProvider:
def __init__(self, *, config):
self.log = logging.getLogger(self.__class__.__name__)
self.config = config

@classmethod
def base64_to_hex(cls, b64val):
if isinstance(b64val, str):
b64val = b64val.encode("ascii")
rawval = codecs.decode(b64val, "base64")
hexval = codecs.encode(rawval, "hex")
return hexval.decode("ascii")

@classmethod
def parse_timestamp(cls, ts, *, with_tz=True, assume_local=False):
"""Parse a given timestamp and return a datetime object with or without tzinfo.
If `with_tz` is False and we can't parse a timezone from the timestamp the datetime object is returned
as-is and we assume the timezone is whatever was requested. If `with_tz` is False and we can parse a
timezone, the timestamp is converted to either local or UTC time based on `assume_local` after which tzinfo
is stripped and the timestamp is returned.
When `with_tz` is True and there's a timezone in the timestamp we return it as-is. If `with_tz` is True
but we can't parse a timezone we add either local or UTC timezone to the datetime based on `assume_local`.
"""
parse_result = dateutil.parser.parse(ts)

# pylint thinks dateutil.parser.parse always returns a tuple even though we didn't request it.
# So this check is pointless but convinces pylint that we really have a datetime object now.
dt = parse_result[0] if isinstance(parse_result, tuple) else parse_result # pylint: disable=unsubscriptable-object

if with_tz is False:
if not dt.tzinfo:
return dt

tz = dateutil.tz.tzlocal() if assume_local else datetime.timezone.utc
return dt.astimezone(tz).replace(tzinfo=None)

if dt.tzinfo:
return dt

tz = dateutil.tz.tzlocal() if assume_local else datetime.timezone.utc
return dt.replace(tzinfo=tz)

def list_items(self):
raise NotImplementedError("list_items")

def get_contents_to_file(self, key, filepath):
raise NotImplementedError("get_contents_to_file")

def get_contents_to_fileobj(self, key, fileobj):
raise NotImplementedError("get_contents_to_fileobj")
Loading

0 comments on commit e21d03c

Please sign in to comment.