Airflow 迁移:从 MySQL 5.7 到 PostgreSQL 13
背景与场景
Airflow是数据工程领域内的一个非常受欢迎的调度框架,其对PostgreSQL(以下简称PG)的支持远好于MySQL。同时,在更广泛的全球化开发者社区内,尤其是在数据应用场景中,PG比MySQL更受人青睐。
而中国大陆的技术环境内,谈起数据库,尽管PG比MySQL的功能更强大、生态更完善,但MySQL的开发者亲和性似乎更胜一筹。这就造成了Airflow + MySQL的技术组合,尽管这种技术组合能够满足基本的程序运行,但在大规模、高频率调度场景和使用Airflow高级功能时,总会遇到各种难缠的问题。
笔者手边恰好有一套这样的环境,这套环境运行着相对老旧的(四年前的)Airflow版本,这次趁版本升级之际,有机会把一些曾经屡次出现的问题一劳永逸解决掉,也就是说,可以做一次从MySQL到PG的数据迁移。
技术路径分析
RDMBS的导出与导入
从RDBMS的角度分析这个问题,最简单的做法可以是:把数据从MySQL导出,然后再导入PG。
这种做法在今天看来,存在三个问题:
- 导出和导入的载体是文本,无法承载二进制数据(如JSON、Binary等数据类型)
- 对于大单表缺乏并发性,性能低
- 对于跨库场景,无法做到严谨的类型映射
这三个问题恰恰是这个数据迁移场景中最关键的三个问题。
二进制的做法
在这个场景中,最关键的一点是,对于二进制数据类型,考虑到信息丢失和JSON转化的麻烦,肯定是不能做文本化处理,因此一定需要使用二进制的数据传输方式。
经过思考和调研,一共发现三种实现途径:
- Binary Serialization
- pgloader
- Spark
Binary Serialization
既然不能转换为文本方式,那么可以用更安全的二进制序列化方式实现。
秉着这个原则,笔者经过检索,最终确认,可以使用protobuf、msgpack等库来实现二进制序列化。那么其实在这种场景中,只需要三步即可实现迁移过程:
- 用JDBC将数据从MySQL读出来
- 二进制序列化
- 用JDBC将数据写入PG
以上三步确实可以完成整个迁移,不过在这之外,还会有一些workaround或者陷阱。
用Python语言可以吗?
理论上可以,实际上会比较困难。
Python用以对接数据库的驱动包,本质上实现的是数据库的通信协议。对于MySQL来说,Python有两个不同的包,分别用C和Python实现了MySQL的通信协议。而不幸的是,笔者曾在过往的开发经验中,遇到过这两个版本驱动包在协议实现上的差异,这在数据不能有分毫误差的场景里是无法接受的。
此外,Python软件生态惯有的低质量给笔者留下了非常不好的印象,笔者无法在严谨的工程中不经测试直接信任Python的软件包。
用ORM框架可以吗?
或许可以。
对于一个成熟的ORM框架来说,应当处理好常见数据库的各种数据类型。但对于这种一次性操作来说,考虑到Python领域没有特别好用的ORM框架,Java的ORM又都特别繁重,引入一个ORM框架实在是徒增复杂性。
经过评估,引入ORM的开发量和使用二进制序列化的开发繁琐程度没有差太多。
读出数据直接INSRET INTO可以吗?
不可以。
写成INSERT INTO的形式,就是文本序列化。
pgloader
pgloader是PG生态里的一个工具,由Common Lisp写成,专门用于针对PG的数据迁移场景。
原本对该方法寄予厚望,但在尝试过程中,屡次遇到无法解决的奇怪bug,遂作罢。
Spark
在如今Datalake大行其道的时代,数据搬运是一件再平常不过的事情。而在这个具体的场景中,前文说到使用Binary Serialization方法中的三步,Spark恰好能完美契合,所以这就成了试错成本最低的方法:只需用Spark将数据读出来、写进去就好。
后来的实践证明,这是最简单、最可行、综合成本最低的方法。
生产环境实践操作
本小节代码仅做演示,不代表真实运行情况。
实际环境的代码应当环境实际情况进行调整。
借用Spark的能力,生产环境实操变得非常简单,共包含以下几步:
- 在PG中建立新的Airflow数据库,名为
airflow_pg,注意这个数据库需要是空的# 1. 初始化Airflow数据库 airflow db init # 2. 导出数据库表结构 pg_dump -d airflow_pg -s -f airflow_pg_schema.sql # 3.删除刚刚新建的数据库,使用表结构重建Airflow数据库 psql -c 'DROP DATABASE airflow_pg; CREATE DATABASE airflow_pg' psql -c '\c airflow_pg' -f airflow_pg_schema.sql - 对
airflow_pg解除主外键约束-- 根据Airflow实际版本确认有哪些表 alter table ab_permission disable trigger all; alter table ab_permission_view disable trigger all; alter table ab_permission_view_role disable trigger all; alter table ab_register_user disable trigger all; alter table ab_role disable trigger all; alter table ab_user disable trigger all; alter table ab_user_role disable trigger all; alter table ab_view_menu disable trigger all; alter table alembic_version disable trigger all; alter table connection disable trigger all; alter table dag disable trigger all; alter table dag_code disable trigger all; alter table dag_pickle disable trigger all; alter table dag_run disable trigger all; alter table dag_tag disable trigger all; alter table import_error disable trigger all; alter table job disable trigger all; alter table log disable trigger all; alter table rendered_task_instance_fields disable trigger all; alter table sensor_instance disable trigger all; alter table serialized_dag disable trigger all; alter table sla_miss disable trigger all; alter table slot_pool disable trigger all; alter table task_fail disable trigger all; alter table task_instance disable trigger all; alter table task_reschedule disable trigger all; alter table variable disable trigger all; alter table xcom disable trigger all; alter table celery_taskmeta disable trigger all; alter table celery_tasksetmeta disable trigger all; - 使用Spark读取
airflow_mysql数据,并依次写入airflow_pg# 根据Airflow实际情况确定哪些表需要迁移 # 注意: # 1. 如`log`、`celery_taskmeta`表不需要迁移 # 2. option('stringtype', 'unspecified')的作用是正确处理表中二进制字段 df = spark.read.format('jdbc')... df.write.format('jdbc')\ .option('stringtype', 'unspecified')\ ... - 在
airflow_pg中指定各个自增主键的位置,对应到各个表ID的最大值-- 根据Airflow实际版本确认有哪自增主键 select setval('ab_permission_id_seq', (select max(id) from ab_permission)); select setval('ab_permission_view_id_seq', (select max(id) from ab_permission_view)); select setval('ab_permission_view_role_id_seq', (select max(id) from ab_permission_view_role)); select setval('ab_register_user_id_seq', (select max(id) from ab_register_user)); select setval('ab_role_id_seq', (select max(id) from ab_role)); select setval('ab_user_id_seq', (select max(id) from ab_user)); select setval('ab_user_role_id_seq', (select max(id) from ab_user_role)); select setval('ab_view_menu_id_seq', (select max(id) from ab_view_menu)); select setval('connection_id_seq', (select max(id) from connection)); select setval('dag_pickle_id_seq', (select max(id) from dag_pickle)); select setval('dag_run_id_seq', (select max(id) from dag_run)); select setval('import_error_id_seq', (select max(id) from import_error)); select setval('job_id_seq', (select max(id) from job)); select setval('log_id_seq', (select max(id) from log)); select setval('sensor_instance_id_seq', (select max(id) from sensor_instance)); select setval('slot_pool_id_seq', (select max(id) from slot_pool)); select setval('task_fail_id_seq', (select max(id) from task_fail)); select setval('task_reschedule_id_seq', (select max(id) from task_reschedule)); select setval('variable_id_seq', (select max(id) from variable)); - 恢复
airflow_pg的主外键约束-- 根据Airflow实际版本确认有哪些表 alter table ab_permission enable trigger all; alter table ab_permission_view enable trigger all; alter table ab_permission_view_role enable trigger all; alter table ab_register_user enable trigger all; alter table ab_role enable trigger all; alter table ab_user enable trigger all; alter table ab_user_role enable trigger all; alter table ab_view_menu enable trigger all; alter table alembic_version enable trigger all; alter table connection enable trigger all; alter table dag enable trigger all; alter table dag_code enable trigger all; alter table dag_pickle enable trigger all; alter table dag_run enable trigger all; alter table dag_tag enable trigger all; alter table import_error enable trigger all; alter table job enable trigger all; alter table log enable trigger all; alter table rendered_task_instance_fields enable trigger all; alter table sensor_instance enable trigger all; alter table serialized_dag enable trigger all; alter table sla_miss enable trigger all; alter table slot_pool enable trigger all; alter table task_fail enable trigger all; alter table task_instance enable trigger all; alter table task_reschedule enable trigger all; alter table variable enable trigger all; alter table xcom enable trigger all; alter table celery_taskmeta enable trigger all; alter table celery_tasksetmeta enable trigger all;
在完成上述五步后,可以将airflow.cfg文件中数据库地址改为PG地址,并重启Airflow服务,即可完成Airflow底层数据库的迁移和更换。
后记
在中国大陆技术生态中,提起数据库,MySQL一定是首先被考虑的,因为其历史原因,因为它的“简单”、“易用”;又因为一些历史的惯性,到2020年后还会有人选择MySQL 5.7作为业务数据库。
笔者并非站在技术主义制高点去指责这些技术乱象,然而,不假思索、不考虑具体业务场景和技术生态,就想“一库走天下”,这是极其不负责任的。
PG与MySQL孰高孰低自不必说,不了解OLAP领域和PG社区的开发者总是会想当然认为,MySQL与PG差别不是太大,但事实显然不是这样。
而在具体的技术生态中,例如Python、Ruby与PG的亲和性,以及Python两个MySQL包的实现差异,这些恐怕一般广泛意义上的开发者并不十分知晓。
所以,作为一名开发者,我仍然建议,要在技术领域多见见世面,多尝试一些新东西,多去深挖一些重要的细节。
而另一个有趣的问题,是关于技术路径的选择和具体实现。
比如这种跨库数据迁移,很多人想当然会使用各种dump工具,也会有一部分人会想到利用Kettle这样的工具来做。这些想法并未触及这个场景的本质——数据转换。
二进制能转换成文本吗?不能。
用Kettle鼠标操作点半天,用Spark 20分钟迁完了。
选择技术路径,最关键的是要理解技术场景中最根本的需求,比如这个场景的本质就是:把MySQL的数据完整转换成PG的数据,并且保证数据本身、主外键、自增索引不变。那这就不是一个数据迁移的问题了,而是数据序列化的问题。
认识到数据序列化的问题,再考虑到序列化的三个步骤,最简单的工具自然能够信手拈来。
而技术路径的选择与实现,考验的就是人对 “技术” 其本身的理解。