资讯专栏INFORMATION COLUMN

Python生成器实现数据处理管道

SwordFly / 616人阅读

摘要:使用生成器函数是一个实现管道机制问题,你想以数据管道类似管道的方式迭代处理数据。当这些生成器被连在一起后,每个会将一个多带带的数据元素传递给迭代处理管道的下一阶段。

假设现在有如下业务场景

某生产系统的日志文件如下,并且在持续增加...

</>复制代码

  1. [ncms@UPZZGAP02 logs]$ pwd
  2. /home/ncms/ncms/logs
  3. [ncms@UPZZGAP02 logs]$ ll
  4. 总用量 797020
  5. -rw-rw-r-- 1 ncms ncms 495465795 11月 30 17:10 ansible.log
  6. -rw-rw-r-- 1 ncms ncms 2251937 11月 30 17:10 celery_beat.log
  7. -rw-rw-r-- 1 ncms ncms 16003 11月 15 10:26 celery_flower.log
  8. -rw-rw-r-- 1 ncms ncms 7042114 11月 30 17:10 celery_worker.log
  9. -rw-r--r-- 1 ncms ncms 24665873 11月 30 17:10 db_error.log
  10. -rw-r--r-- 1 ncms ncms 52428571 11月 28 18:46 db_error.log.1
  11. -rw-r--r-- 1 ncms ncms 52428691 11月 24 06:43 db_error.log.2
  12. -rw-r--r-- 1 ncms ncms 22410652 11月 19 15:16 db_error.log.3
  13. -rw-r--r-- 1 ncms ncms 28064985 11月 30 17:10 db_info.log
  14. -rw-r--r-- 1 ncms ncms 52426630 11月 28 13:29 db_info.log.1
  15. -rw-r--r-- 1 ncms ncms 52427357 11月 24 03:48 db_info.log.2
  16. -rw-r--r-- 1 ncms ncms 24276767 11月 19 15:16 db_info.log.3
  17. -rw-rw-r-- 1 ncms ncms 42490 11月 30 13:06 ncms_access.log
  18. -rw-rw-r-- 1 ncms ncms 24072 10月 30 15:33 ncms_error.log
  19. -rw-rw-r-- 1 ncms ncms 1350318 11月 30 16:38 nginx_access.log
  20. -rw-rw-r-- 1 ncms ncms 1685 11月 7 18:15 nginx_error.log
  21. -rw-rw-r-- 1 ncms ncms 24001 11月 15 10:27 supervisord.log
  22. -rw-rw-r-- 1 ncms ncms 645742 11月 30 16:38 uwsgi.log
  23. [ncms@UPZZGAP02 logs]$ du -sh *
  24. 473M ansible.log
  25. 2.2M celery_beat.log
  26. 16K celery_flower.log
  27. 6.8M celery_worker.log
  28. 24M db_error.log
  29. 51M db_error.log.1
  30. 51M db_error.log.2
  31. 22M db_error.log.3
  32. 27M db_info.log
  33. 51M db_info.log.1
  34. 51M db_info.log.2
  35. 24M db_info.log.3
  36. 44K ncms_access.log
  37. 24K ncms_error.log
  38. 1.3M nginx_access.log
  39. 4.0K nginx_error.log
  40. 24K supervisord.log
  41. 632K uwsgi.log
  42. [ncms@UPZZGAP02 logs]$

其中有应用、数据库、Celery、Nginx、uwsgi、supervisord、Ansible的日志,Ansible.log有473M,未来很定会更大。现在需要使用某些关键字对日志进行查找分析,应该如何做?

最简单粗暴的方式就是使用grep之类的命令,递归查找所有的.log文件,但这样会耗费大量内存,影响机器性能。

可以考虑使用数据管道 (类似 Unix 管道) 的方式迭代处理数据。使用Python生成器函数是一个实现管道机制

</>复制代码

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # __author__ = "liao gao xiang"
  4. import os
  5. import fnmatch
  6. import gzip
  7. import bz2
  8. import re
  9. # 问题,你想以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,你有个大量的数据
  10. # 需要处理,但是不能将它们一次性放入内存中。可以使用生成器实现数据处理管道
  11. """ 文件格式如下
  12. foo/
  13. access-log-012007.gz
  14. access-log-022007.gz
  15. access-log-032007.gz
  16. ...
  17. access-log-012008
  18. bar/
  19. access-log-092007.bz2
  20. ...
  21. access-log-022008
  22. """
  23. def gen_find(filepat, top):
  24. """
  25. 查找符合Shell正则匹配的目录树下的所有文件名
  26. :param filepat: shell正则
  27. :param top: 目录路径
  28. :return: 文件绝对路径生成器
  29. """
  30. for path, _, filenames in os.walk(top):
  31. for file in fnmatch.filter(filenames, filepat):
  32. yield os.path.join(path, file)
  33. def gen_opener(filenames):
  34. """
  35. 每打开一个文件生成就生成一个文件对象,调用下一个迭代前关闭文件
  36. :param filenames: 多个文件绝对路径组成的可迭代对象
  37. :return: 文件对象生成器
  38. """
  39. for filename in filenames:
  40. if filename.endswith(".gz"):
  41. f = gzip.open(filename, "r", encoding="utf-8")
  42. elif filename.endswith(".bz2"):
  43. f = bz2.open(filename, "r", encoding="utf-8")
  44. else:
  45. f = open(filename, "r", encoding="utf-8")
  46. yield f
  47. f.close()
  48. def gen_concatenate(iterators):
  49. """
  50. 将输入序列拼接成一个很长的行序列。
  51. :param iterators:
  52. :return: 返回生成器所产生的所有值
  53. """
  54. for it in iterators:
  55. yield from it
  56. def gen_grep(pattern, lines):
  57. """
  58. 使用正则匹配行
  59. :param pattern: 正则匹配
  60. :param lines: 多行
  61. :return: 结果生成器
  62. """
  63. pat = re.compile(pattern)
  64. for n, line in enumerate(lines, start=1):
  65. if pat.search(line):
  66. yield n, line
  67. if __name__ == "__main__":
  68. filenames = gen_find("*.log", "/home/ncms/ncms/logs")
  69. files = gen_opener(filenames)
  70. lines = gen_concatenate(files)
  71. user_action = gen_grep("(?i)liaogaoxiang_kd", lines)
  72. for n, line in user_action:
  73. print(line)

查询包含用户 liaogaoxiang_kd 的所有记录,数据结果如下:

</>复制代码

  1. [views:post]:2018-11-07 18:13:09.841490 -users- liaogaoxiang_kd登录成功!
  2. [views:get]:2018-11-07 18:16:04.681519 -users- liaogaoxiang_kd访问了用户信息列表
  3. [views:post]:2018-11-07 18:16:23.866700 -users- liaogaoxiang_kd编辑了用户的信息
  4. [views:get]:2018-11-07 18:16:23.878949 -users- liaogaoxiang_kd访问了用户信息列表
  5. [views:get]:2018-11-07 18:16:25.641090 -users- liaogaoxiang_kd访问了用户信息列表
  6. [views:post]:2018-11-07 18:16:42.671377 -users- liaogaoxiang_kd编辑了用户的信息
  7. [views:get]:2018-11-07 18:16:42.719873 -users- liaogaoxiang_kd访问了用户信息列表
  8. [views:post]:2018-11-08 11:17:42.627693 -users- liaogaoxiang_kd登录成功!

如需查询其它错误信息,只需替换gen_grep("(?i)liaogaoxiang_kd", lines)中的关键字即可!以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时 轮询等。

为了理解上述代码,重点是要明白 yield 语句作为数据的生产者而 for 循环语句 作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个多带带的数据元 素传递给迭代处理管道的下一阶段。这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就 很容易编写和维护。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。

使用这种方式的内存效率很高。上述代码即便是在一个超大型文件目录中 也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。 在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。 itertools.chain() 函数同样有类似的功能, 但是它需要将所有可迭代对象最为参数传入。在上面这个例子中,你可能会写类似这样 的语句 lines = itertools.chain(*files) ,这将导致 gen_opener() 生成器被提前全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭 代步骤时文件就关闭了,因此 chain() 在这里不能这样使用。上面的方案可以避免这 种情况。

gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父 生成器上去。语句 yield from it 简单的返回生成器 it 所产生的所有值。

</>复制代码

  1. 程序员交流群,干货分享,加我拉你入群。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/19851.html

相关文章

  • Python成器实现数据处理管道

    摘要:使用生成器函数是一个实现管道机制问题,你想以数据管道类似管道的方式迭代处理数据。当这些生成器被连在一起后,每个会将一个单独的数据元素传递给迭代处理管道的下一阶段。 假设现在有如下业务场景 某生产系统的日志文件如下,并且在持续增加... [ncms@UPZZGAP02 logs]$ pwd /home/ncms/ncms/logs [ncms@UPZZGAP02 logs]$ ll 总用...

    Lin_R 评论0 收藏0
  • Python成器实现数据处理管道

    摘要:使用生成器函数是一个实现管道机制问题,你想以数据管道类似管道的方式迭代处理数据。当这些生成器被连在一起后,每个会将一个单独的数据元素传递给迭代处理管道的下一阶段。 假设现在有如下业务场景 某生产系统的日志文件如下,并且在持续增加... [ncms@UPZZGAP02 logs]$ pwd /home/ncms/ncms/logs [ncms@UPZZGAP02 logs]$ ll 总用...

    dreamtecher 评论0 收藏0
  • python迭代器与成器小结

    摘要:迭代器要说生成器,必须首先说迭代器区分与讲到迭代器,就需要区别几个概念看着都差不多,其实不然。比如常见就是与分离实现的本身是可迭代对象,但不是迭代器,类似与但是又不同。 2016.3.10关于例子解释的补充更新 源自我的博客 例子 老规矩,先上一个代码: def add(s, x): return s + x def gen(): for i in range(...

    hellowoody 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<