作为一名数据清洗程序员,平时的主要工作是搬运和规范化数据,在 ETL 中处于 T(Transform) 和 L(Load) 的阶段。在我们的线上业务数据实时更新流程里,对于每张 MySQL 表都要事先对比新旧数据,只提交更新变化的字段,即所谓真实更新,然而因为祖传代码和同事之间开发规范不统一,对于真实更新的处理存在多份不同的实现,导致了大量重复代码和难以维护的问题,我决定封装这一业务逻辑,将其统一和规范化,解决以上暴露出的问题。
封装这一业务,需要考虑兼容性和可扩展性,其一要能够兼容所有数据维度(每个维度对应一张表),其二便于用户扩展添加模块的功能,以应对特殊业务处理逻辑需求的场景。以上两点是整个设计过程中必须记住的,否则又会是多了一份重复代码,使原有的问题更严重。
观察业务,处理的数据为 JSON 格式,映射到 Python 里就是一个字典结构,每次更新时都会拿到多条这样的新数据,再与数据库里的旧数据对比去重,并找出每条数据变化了的字段,最后请求 API 接口更新。原有的实现大同小异,都是直接处理 Python 字典数据,去重逻辑为遍历循环作对比,并且衍生了很多不够通用的辅助功能函数,晦涩难懂,不好测试,简直维护地狱。
有个概念叫 数据类, 每一种数据可以封装成一个类,便于对数据做复杂的逻辑处理,此次代码优化也是以此概念为基础,把处理 Python 字典转换成处理数据类,如此一来可以简化工作,这里推荐两篇文章
在 3.7 之后的 Python 版本中,有个名为 dataclasses 的模块,它就是数据类在 Python 里的通用模块实现,非常便于处理数据。最开始是 Python3.6 时候的一个第三方库,因为太好用且使用广泛,在 3.7 之后被加入进了标准库里。以下官方文档和原项目的 GitHub 链接,感兴趣可以看下。
PS: 我曾试图把该模块移植到 Python2,不过失败了 :(
失败的实现:zhiweio/dataclasses (github.com)
因为数据更新都是走接口,所以这里和直接连接数据库更新有所不同,最后更新的数据要构造成接口规范的格式,下面代码里可能会有些具体业务上的细节,这些都是细枝末节,可以忽略,了解大致思路即可。
首先开始实现一个数据类雏形
class DataNode(object):
def __init__(self, data, uniq_keys, eid=None, primary_key=None):
self.data = copy.deepcopy(data)
# 这里因为业务上需要所以必须有 eid,忽略即可
# `eid` required
if not self.data.get('eid') and isinstance(eid, basestring) and len(eid) == 36:
self.assign(self.data, eid=eid)
elif self.data.get('eid'):
pass
else:
raise DataNodeError(
'construct DataNode failed due to without eid or invaild eid')
self.uniq_keys = uniq_keys
self.pk = primary_key
self.__unique = tuple([self.data[k] for k in self.uniq_keys])
@property
def unique(self):
return self.__unique
def assign(self, data, *args, **kwargs):
""" assign value into DataNode.data
Example:
>>> assign(data, eid='123', pk=123)
>>> assign(data, {'eid': '123', 'pk': 123})
>>> assign([data, (eid', '123'), ('pk', 123)])
"""
for x in args:
if isinstance(x, collections.Iterable):
kwargs.update(dict(x))
for k, v in kwargs.iteritems():
data[k] = v