Airflow 迁移:从 MySQL 5.7 到 PostgreSQL 13

2024-01-30
7分钟阅读时长

背景与场景

Airflow是数据工程领域内的一个非常受欢迎的调度框架,其对PostgreSQL(以下简称PG)的支持远好于MySQL。同时,在更广泛的全球化开发者社区内,尤其是在数据应用场景中,PG比MySQL更受人青睐。

而中国大陆的技术环境内,谈起数据库,尽管PG比MySQL的功能更强大、生态更完善,但MySQL的开发者亲和性似乎更胜一筹。这就造成了Airflow + MySQL的技术组合,尽管这种技术组合能够满足基本的程序运行,但在大规模、高频率调度场景和使用Airflow高级功能时,总会遇到各种难缠的问题。

笔者手边恰好有一套这样的环境,这套环境运行着相对老旧的(四年前的)Airflow版本,这次趁版本升级之际,有机会把一些曾经屡次出现的问题一劳永逸解决掉,也就是说,可以做一次从MySQL到PG的数据迁移。

技术路径分析

RDMBS的导出与导入

从RDBMS的角度分析这个问题,最简单的做法可以是:把数据从MySQL导出,然后再导入PG。

这种做法在今天看来,存在三个问题:

  1. 导出和导入的载体是文本,无法承载二进制数据(如JSON、Binary等数据类型)
  2. 对于大单表缺乏并发性,性能低
  3. 对于跨库场景,无法做到严谨的类型映射

这三个问题恰恰是这个数据迁移场景中最关键的三个问题。

二进制的做法

在这个场景中,最关键的一点是,对于二进制数据类型,考虑到信息丢失和JSON转化的麻烦,肯定是不能做文本化处理,因此一定需要使用二进制的数据传输方式。

经过思考和调研,一共发现三种实现途径:

  • Binary Serialization
  • pgloader
  • Spark

Binary Serialization

既然不能转换为文本方式,那么可以用更安全的二进制序列化方式实现。

秉着这个原则,笔者经过检索,最终确认,可以使用protobufmsgpack等库来实现二进制序列化。那么其实在这种场景中,只需要三步即可实现迁移过程:

  1. 用JDBC将数据从MySQL读出来
  2. 二进制序列化
  3. 用JDBC将数据写入PG

以上三步确实可以完成整个迁移,不过在这之外,还会有一些workaround或者陷阱。

用Python语言可以吗?

理论上可以,实际上会比较困难。

Python用以对接数据库的驱动包,本质上实现的是数据库的通信协议。对于MySQL来说,Python有两个不同的包,分别用C和Python实现了MySQL的通信协议。而不幸的是,笔者曾在过往的开发经验中,遇到过这两个版本驱动包在协议实现上的差异,这在数据不能有分毫误差的场景里是无法接受的。

此外,Python软件生态惯有的低质量给笔者留下了非常不好的印象,笔者无法在严谨的工程中不经测试直接信任Python的软件包。

用ORM框架可以吗?

或许可以。

对于一个成熟的ORM框架来说,应当处理好常见数据库的各种数据类型。但对于这种一次性操作来说,考虑到Python领域没有特别好用的ORM框架,Java的ORM又都特别繁重,引入一个ORM框架实在是徒增复杂性。

经过评估,引入ORM的开发量和使用二进制序列化的开发繁琐程度没有差太多。

读出数据直接INSRET INTO可以吗?

不可以。

写成INSERT INTO的形式,就是文本序列化。

pgloader

pgloader是PG生态里的一个工具,由Common Lisp写成,专门用于针对PG的数据迁移场景。

原本对该方法寄予厚望,但在尝试过程中,屡次遇到无法解决的奇怪bug,遂作罢。

Spark

在如今Datalake大行其道的时代,数据搬运是一件再平常不过的事情。而在这个具体的场景中,前文说到使用Binary Serialization方法中的三步,Spark恰好能完美契合,所以这就成了试错成本最低的方法:只需用Spark将数据读出来、写进去就好。

后来的实践证明,这是最简单、最可行、综合成本最低的方法。

生产环境实践操作

本小节代码仅做演示,不代表真实运行情况。

实际环境的代码应当环境实际情况进行调整。

借用Spark的能力,生产环境实操变得非常简单,共包含以下几步:

  1. 在PG中建立新的Airflow数据库,名为airflow_pg,注意这个数据库需要是空的
    # 1. 初始化Airflow数据库
    airflow db init
    # 2. 导出数据库表结构
    pg_dump -d airflow_pg -s -f airflow_pg_schema.sql
    # 3.删除刚刚新建的数据库,使用表结构重建Airflow数据库
    psql -c 'DROP DATABASE airflow_pg; CREATE DATABASE airflow_pg'
    psql -c '\c airflow_pg' -f airflow_pg_schema.sql
    
  2. airflow_pg解除主外键约束
    -- 根据Airflow实际版本确认有哪些表
    alter table ab_permission disable trigger all;
    alter table ab_permission_view disable trigger all;
    alter table ab_permission_view_role disable trigger all;
    alter table ab_register_user disable trigger all;
    alter table ab_role disable trigger all;
    alter table ab_user disable trigger all;
    alter table ab_user_role disable trigger all;
    alter table ab_view_menu disable trigger all;
    alter table alembic_version disable trigger all;
    alter table connection disable trigger all;
    alter table dag disable trigger all;
    alter table dag_code disable trigger all;
    alter table dag_pickle disable trigger all;
    alter table dag_run disable trigger all;
    alter table dag_tag disable trigger all;
    alter table import_error disable trigger all;
    alter table job disable trigger all;
    alter table log disable trigger all;
    alter table rendered_task_instance_fields disable trigger all;
    alter table sensor_instance disable trigger all;
    alter table serialized_dag disable trigger all;
    alter table sla_miss disable trigger all;
    alter table slot_pool disable trigger all;
    alter table task_fail disable trigger all;
    alter table task_instance disable trigger all;
    alter table task_reschedule disable trigger all;
    alter table variable disable trigger all;
    alter table xcom disable trigger all;
    alter table celery_taskmeta disable trigger all;
    alter table celery_tasksetmeta disable trigger all;
    
  3. 使用Spark读取airflow_mysql数据,并依次写入airflow_pg
    # 根据Airflow实际情况确定哪些表需要迁移
    # 注意:
    #   1. 如`log`、`celery_taskmeta`表不需要迁移
    #   2. option('stringtype', 'unspecified')的作用是正确处理表中二进制字段
    df = spark.read.format('jdbc')...
    df.write.format('jdbc')\
        .option('stringtype', 'unspecified')\
        ...
    
  4. airflow_pg中指定各个自增主键的位置,对应到各个表ID的最大值
    -- 根据Airflow实际版本确认有哪自增主键
     select setval('ab_permission_id_seq', (select max(id) from ab_permission));
     select setval('ab_permission_view_id_seq', (select max(id) from ab_permission_view));
     select setval('ab_permission_view_role_id_seq', (select max(id) from ab_permission_view_role));
     select setval('ab_register_user_id_seq', (select max(id) from ab_register_user));
     select setval('ab_role_id_seq', (select max(id) from ab_role));
     select setval('ab_user_id_seq', (select max(id) from ab_user));
     select setval('ab_user_role_id_seq', (select max(id) from ab_user_role));
     select setval('ab_view_menu_id_seq', (select max(id) from ab_view_menu));
     select setval('connection_id_seq', (select max(id) from connection));
     select setval('dag_pickle_id_seq', (select max(id) from dag_pickle));
     select setval('dag_run_id_seq', (select max(id) from dag_run));
     select setval('import_error_id_seq', (select max(id) from import_error));
     select setval('job_id_seq', (select max(id) from job));
     select setval('log_id_seq', (select max(id) from log));
     select setval('sensor_instance_id_seq', (select max(id) from sensor_instance));
     select setval('slot_pool_id_seq', (select max(id) from slot_pool));
     select setval('task_fail_id_seq', (select max(id) from task_fail));
     select setval('task_reschedule_id_seq', (select max(id) from task_reschedule));
     select setval('variable_id_seq', (select max(id) from variable));
    
  5. 恢复airflow_pg的主外键约束
    -- 根据Airflow实际版本确认有哪些表
     alter table ab_permission enable trigger all;
     alter table ab_permission_view enable trigger all;
     alter table ab_permission_view_role enable trigger all;
     alter table ab_register_user enable trigger all;
     alter table ab_role enable trigger all;
     alter table ab_user enable trigger all;
     alter table ab_user_role enable trigger all;
     alter table ab_view_menu enable trigger all;
     alter table alembic_version enable trigger all;
     alter table connection enable trigger all;
     alter table dag enable trigger all;
     alter table dag_code enable trigger all;
     alter table dag_pickle enable trigger all;
     alter table dag_run enable trigger all;
     alter table dag_tag enable trigger all;
     alter table import_error enable trigger all;
     alter table job enable trigger all;
     alter table log enable trigger all;
     alter table rendered_task_instance_fields enable trigger all;
     alter table sensor_instance enable trigger all;
     alter table serialized_dag enable trigger all;
     alter table sla_miss enable trigger all;
     alter table slot_pool enable trigger all;
     alter table task_fail enable trigger all;
     alter table task_instance enable trigger all;
     alter table task_reschedule enable trigger all;
     alter table variable enable trigger all;
     alter table xcom enable trigger all;
     alter table celery_taskmeta enable trigger all;
     alter table celery_tasksetmeta enable trigger all;
    

在完成上述五步后,可以将airflow.cfg文件中数据库地址改为PG地址,并重启Airflow服务,即可完成Airflow底层数据库的迁移和更换。

后记

在中国大陆技术生态中,提起数据库,MySQL一定是首先被考虑的,因为其历史原因,因为它的“简单”、“易用”;又因为一些历史的惯性,到2020年后还会有人选择MySQL 5.7作为业务数据库。

笔者并非站在技术主义制高点去指责这些技术乱象,然而,不假思索、不考虑具体业务场景和技术生态,就想“一库走天下”,这是极其不负责任的。

PG与MySQL孰高孰低自不必说,不了解OLAP领域和PG社区的开发者总是会想当然认为,MySQL与PG差别不是太大,但事实显然不是这样。

而在具体的技术生态中,例如Python、Ruby与PG的亲和性,以及Python两个MySQL包的实现差异,这些恐怕一般广泛意义上的开发者并不十分知晓。

所以,作为一名开发者,我仍然建议,要在技术领域多见见世面,多尝试一些新东西,多去深挖一些重要的细节。

而另一个有趣的问题,是关于技术路径的选择和具体实现。

比如这种跨库数据迁移,很多人想当然会使用各种dump工具,也会有一部分人会想到利用Kettle这样的工具来做。这些想法并未触及这个场景的本质——数据转换。

二进制能转换成文本吗?不能。

用Kettle鼠标操作点半天,用Spark 20分钟迁完了。

选择技术路径,最关键的是要理解技术场景中最根本的需求,比如这个场景的本质就是:把MySQL的数据完整转换成PG的数据,并且保证数据本身、主外键、自增索引不变。那这就不是一个数据迁移的问题了,而是数据序列化的问题。

认识到数据序列化的问题,再考虑到序列化的三个步骤,最简单的工具自然能够信手拈来。

而技术路径的选择与实现,考验的就是人对 “技术” 其本身的理解。

comments powered by Disqus