前言

作为一名数据清洗程序员,平时的主要工作是搬运和规范化数据,在 ETL 中处于 T(Transform) 和 L(Load) 的阶段。在我们的线上业务数据实时更新流程里,对于每张 MySQL 表都要事先对比新旧数据,只提交更新变化的字段,即所谓真实更新,然而因为祖传代码和同事之间开发规范不统一,对于真实更新的处理存在多份不同的实现,导致了大量重复代码和难以维护的问题,我决定封装这一业务逻辑,将其统一和规范化,解决以上暴露出的问题。

正文

设计思考

封装这一业务,需要考虑兼容性和可扩展性,其一要能够兼容所有数据维度(每个维度对应一张表),其二便于用户扩展添加模块的功能,以应对特殊业务处理逻辑需求的场景。以上两点是整个设计过程中必须记住的,否则又会是多了一份重复代码,使原有的问题更严重。

观察业务,处理的数据为 JSON 格式,映射到 Python 里就是一个字典结构,每次更新时都会拿到多条这样的新数据,再与数据库里的旧数据对比去重,并找出每条数据变化了的字段,最后请求 API 接口更新。原有的实现大同小异,都是直接处理 Python 字典数据,去重逻辑为遍历循环作对比,并且衍生了很多不够通用的辅助功能函数,晦涩难懂,不好测试,简直维护地狱。

有个概念叫 数据类, 每一种数据可以封装成一个类,便于对数据做复杂的逻辑处理,此次代码优化也是以此概念为基础,把处理 Python 字典转换成处理数据类,如此一来可以简化工作,这里推荐两篇文章

3.7 之后的 Python 版本中,有个名为 dataclasses 的模块,它就是数据类在 Python 里的通用模块实现,非常便于处理数据。最开始是 Python3.6 时候的一个第三方库,因为太好用且使用广泛,在 3.7 之后被加入进了标准库里。以下官方文档和原项目的 GitHub 链接,感兴趣可以看下。

PS: 我曾试图把该模块移植到 Python2,不过失败了 :(

失败的实现:zhiweio/dataclasses (github.com)

实现

因为数据更新都是走接口,所以这里和直接连接数据库更新有所不同,最后更新的数据要构造成接口规范的格式,下面代码里可能会有些具体业务上的细节,这些都是细枝末节,可以忽略,了解大致思路即可。

数据类

首先开始实现一个数据类雏形

class DataNode(object):
    def __init__(self, data, uniq_keys, eid=None, primary_key=None):
        self.data = copy.deepcopy(data)
        # 这里因为业务上需要所以必须有 eid,忽略即可
        # `eid` required
        if not self.data.get('eid') and isinstance(eid, basestring) and len(eid) == 36:
            self.assign(self.data, eid=eid)
        elif self.data.get('eid'):
            pass
        else:
            raise DataNodeError(
                'construct DataNode failed due to without eid or invaild eid')
        self.uniq_keys = uniq_keys
        self.pk = primary_key
        self.__unique = tuple([self.data[k] for k in self.uniq_keys])

    @property
    def unique(self):
        return self.__unique
    
    def assign(self, data, *args, **kwargs):
        """ assign value into DataNode.data

        Example:
        >>> assign(data, eid='123', pk=123)
        >>> assign(data, {'eid': '123', 'pk': 123})
        >>> assign([data, (eid', '123'), ('pk', 123)])
        """
        for x in args:
            if isinstance(x, collections.Iterable):
                kwargs.update(dict(x))
        for k, v in kwargs.iteritems():
            data[k] = v