Amos's Blog

大数据|容器化|♾️

0%

在项目的演进过程中,都会经历项目需求从少至多,由简单到复杂的过程。在这个不停追加的过程中,项目结构也会从一个小脚本就能搞定变化到需要由几个小脚本配合共同支撑,项目的代码也会从精简干练慢慢变到冗余、重合度高。

这时候,为了让项目结构更加稳健,易用性更高,可读性更好,是需要重建程序入口,把脚本工程化的。

看起来很复杂,实际上就是在项目演进的不同阶段适时地重构,每个优秀的项目都是经过千(多)锤(次)百(重)炼(构)才能形成的。

今天就从程序的入口开始,记录一次改造。(中间省略 1000 字为重构的细节及方法…)

在最初的版本,程序入口直接使用了简单粗暴的 if - else,带来的问题就是,无论如何封装抽象,都无法改变可读性极差的结果。于是在无力演进的情况下,选择一款 CLI 构建工具,提高可持续性,防止自己重造车轮。

CLI 构建工具 Click

Click 的基础使用方法和高级特性都可以从官方文档获取

以下谨记录个人使用到的,以及我认为最常用的操作。

基本用法

夏天来了,案例使用了冰淇淋,这样看起来会清凉一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import click

@click.group()
def test_group():
pass

@click.command()
@click.option('--brand', default=1, prompt='请选择品牌:1.和路雪 2.雀巢 3.五羊 4.其它', help='小卖部品牌分类:1.和路雪 2.雀巢 3.五羊 4.其它')
@click.option('--category', default=1, prompt='请选择类别:1.雪糕 2.冰棍 3.杯装', help='冰淇淋类别:1.雪糕 2.冰棍 3.杯装')
def ice_cream_selection(brand, category):
"""冰淇淋订购程序"""
print(brand, category)
if category == 3:
result = taste_selection()
print(result)
print("finished!")

@click.command()
@click.option('--selected', default=0, prompt='请选择口味:1.香草味 2.巧克力味')
def taste_selection(selected):
return click.echo(selected)

test_group.add_command(ice_cream_selection)
test_group.add_command(taste_selection)

if __name__ == '__main__':
ice_cream_selection()
#test_group()
阅读全文 »

listen 与 (frontend + backend) 的配置区别

frontend + backend 的配置示例及运行流程

这是一个 HTTP 代理在所有接口上监听端口 80 的简单配置,请求进入后的运行流程如下:

  1. frontend 监听 80 端口
  2. 请求被 frontend 转发到 backend 的 servers 中
  3. 请求进入运行在 127.0.0.1:8000 的 server1 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Simple configuration for an HTTP proxy listening on port 80 on all
# interfaces and forwarding requests to a single backend "servers" with a
# single server "server1" listening on 127.0.0.1:8000

global
daemon
maxconn 256

defaults
mode http
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms

frontend http-in
bind *:80
default_backend servers

backend servers
server server1 127.0.0.1:8000 maxconn 32

阅读全文 »

HAProxy 配置中有三个重要部分

  • global
  • defaults
  • listen or (frontend + backend)

常用配置详解

全局配置(global)

参数名 参数含义
maxconn 20480 默认最大连接数
daemon 以后台形式运行haproxy
nbproc 1 进程数量(可以设置多个进程提高性能)
pidfile /var/run/haproxy.pid haproxy 的 pid 存放路径,启动进程的用户必须有权限访问此文件

默认全局配置(defaults)

这些参数可以被利用配置到frontend,backend,listen组件

option 参数可以同时配置多个

balance 只可配置一个*

阅读全文 »

Airflow 是个很奇怪的东西,直到 1.10 它都没有权限系统,自带的创建用户和密码验证,只能作为一扇挡着外人的门。只要打开这扇门,里面的所有 Owner 下的 Dag 都会完完全全展示在你面前并且受你掌控。

究其原因,便是 airflow 自带的用户创建默认是 superuser,虽然话是这么说,创建个普通 user 就行了,但在一系列尝试后,我发现,它并没有提供其他等级的用户给你选择,只有 superuser 。。

所以在尝试了这一系列操作之后,我觉得有必要使用官方建议的 LDAP,虽然它看起来很麻烦的样子。

然而经过一番探查,我发现 LDAP 的作用并不是 filter dags by ower,他只是细粒度的分配登录权限。而真正的分发 Dag 到个人账户下,还是得等待 airflow 新版本的发布(当前版本该选项不生效)。


airflow webserver not filtering dags by owner

向 Graphite 写入数据

Graphite 的数据主要以 whisper 的格式存储在 carbon 这个组件中,而向其中填充数据的话我们有多种方式,这里演示两种常用手段。

使用 python

1
$ pip3 install graphyte

这里使用了 graphyte,用法很简单,只需要指定写入的 series 对应的值即可,写入的时间戳与 tag 是附加选项,可写可不写。

1
graphyte.send(series, value, timestamp, {'tag_name': tag_value, 'tag_name': 'tag_value'})

使用 curl

1
$ echo "foo.bar 3 `date +%s`" | nc 127.0.0.1 2003

当数据准备好了之后便按你的喜好来设置图形的样式

设置预警

进入到 Grafana 中的 Alert 标签下,主要需要设置以下几个参数

  1. Alert Config
  2. Conditions

如果想在设置的阈值触发时发送消息通知,则需要进一步设置 Notifications 以及 Alert RulesNotification channels

Alert Config

主要关注 Evaluate every 这个选项

我猜想 Grafana 的开发团队对它的定义为时效性高的预警通知工具,对于数据异常的及时发现,对比的间隔时间显然是越短越好。

它们希望你将比对的时间间隔缩短,30s、60s、120s,这样发现数据异常的速度也就越快。

今年年初,开发者在一条 issue 下做过这样一个答复

torkelo: you cannot control what time of day it will evaluate, it will evaluate every 24 hours. So if it was more than 24 hours since last evaluation it will evaluate (no matter what time of day)

所以,grafana 对小时级、天级或在精确时间点触发预警的支持是很不友好的

阅读全文 »

Graphite 是一款时序数据库

初步安装

安装 Graphite 有 n 种方法,在下选用的是 pip 安装的方式,使用默认安装位置 /opt/graphite

十分简单,只需要分别安装三个组件 graphite-webwhispercarbon

1
2
3
4
$ export PYTHONPATH="/opt/graphite/lib/:/opt/graphite/webapp/"
$ pip3 install --no-binary=:all: https://github.com/graphite-project/whisper/tarball/master
$ pip3 install --no-binary=:all: https://github.com/graphite-project/carbon/tarball/master
$ pip3 install --no-binary=:all: https://github.com/graphite-project/graphite-web/tarball/master

如果你希望自己指定安装位置:Installing From Pip

阅读全文 »

本文成型的历史原因来源于使用 shell 脚本封装单个 sqoop 的脑壳疼操作

在执行定时任务的时候,如果需要执行的 sqoop 足够多,我们可以将他们封装在一个 shell 脚本中,再使用 crontab 进行定时任务调用。

而如果同一时间需要执行的 sqoop 数量仅仅只有一两个,那我们完全没有必要为了它写一个 shell 脚本,当然只是按场景选择不同策略,并不代表哪种方法更差。

对于不想多建 shell 脚本的同学,还有 sqoop job 这条路。

阅读全文 »

Sqoop 的功能十分强大,可以帮助你完成不同数据库或数据仓库之间的数据同步任务。

总所周知的是 Sqoop 有两个版本

  • 1.4.x
  • 1.99.x

它们一个代表着 sqoop1,一个代表着sqoop2,它们功能性上的异同可简单归纳为以下几点,其余差异不在本文做过多的赘述。

功能 Sqoop 1 Sqoop 2
用于所有主要 RDBMS 的连接器 支持 不支持
解决办法: 使用已在以下数据库上执行测试的通用 JDBC 连接器: Microsoft SQL Server 、 PostgreSQL 、 MySQL 和 Oracle 。
Kerberos 安全集成 支持 不支持
数据从 RDBMS 传输至 Hive 或 HBase 支持 不支持
解决办法: 按照此两步方法操作。 将数据从 RDBMS 导入 HDFS 在 Hive 中使用相应的工具和命令(例如 LOAD DATA 语句),手动将数据载入 Hive 或 HBase
数据从 Hive 或 HBase 传输至 RDBMS 不支持
解决办法: 按照此两步方法操作。 从 Hive 或 HBase 将数据提取至 HDFS (作为文本或 Avro 文件) 使用 Sqoop 将上一步的输出导出至 RDBMS
不支持
按照与 Sqoop 1 相同的解决方法操作

关于两者在其他方面的异同可以参考此文:Sqoop1和Sqoop2的刨析对比

出发

本文的背景环境为 sqoop 1.4.6

sqoop 的命令格式十分简单,只需要往上累加需要的参数即可

阅读全文 »

由于大表关联,分析同学的执行操作要得出结果往往需要好几分钟,这在千万级、亿级数据量的表之间关联得出结果,不能说是很慢,但是依旧有着可提升的空间。

探索

Impala 中,有一个神秘指令,COMPUTE STATS

它可以预先分析表和列的的结构,并将其存储在元数据中。等到执行查询的时候, Impala 便会根据存储的元数据做出相应的查询优化。

也就是下面这条语句:

1
COMPUTE STATS t1;

执行前

show table stats t1;

#Rows #Files Size Bytes Cached Cache Replication Format Incremental stats Location
-1 4 1.72GB NOT CACHED NOT CACHED PARQUET false hdfs://nameservice1/user/hive/warehouse/t1

show column stats t1;

Column Type #Distinct Values #Nulls Max Size Avg Size
id BIGINT -1 -1 8 8
type INT -1 -1 4 4
uid BIGINT -1 -1 8 8

执行后

show table stats t1;

#Rows #Files Size Bytes Cached Cache Replication Format Incremental stats Location
32233129 4 1.72GB NOT CACHED NOT CACHED PARQUET false hdfs://nameservice1/user/hive/warehouse/t1

show column stats t1;

Column Type #Distinct Values #Nulls Max Size Avg Size
id BIGINT 45300013 0 8 8
type INT 14 0 4 4
uid BIGINT 2831250 0 8 8

分析

从上面的表格可以看出,compute stats 为我们缓存了几个较为常用的 count 值,不要小看这几个值。

在大型连表查询中,相比未经过 compute stats 优化的速度提升是几倍甚至十几倍,而相对 hive 的相同查询操作,速度差距将会达到几十倍。

Hive 依然适用

如果想在 hive 中执行,Impala 中查询,也可在 Hive 中执行操作

1
2
ANALYZE TABLE Table1 COMPUTE STATISTICS;
ANALYZE TABLE Table1 COMPUTE STATISTICS FOR COLUMNS;

了解更多

COMPUTE STATS Statement

就像这样,这小子坏得很。

1541489895790

初现端倪

好在 Task Instance Details 中可以看到这小子心里都在想些啥,这是十分关键的,以及具有重大突破性的调查入口!

1
depends_on_past is true for this task's DAG, but the previous task instance has not run yet.

做为一个新来的 dag,旗下每一个格子都是小白块,怎么可能会有前置依赖没执行?十分任性!

顺藤摸瓜

找到问题所在就好办了

前往代码中查看,我确实将 depends_on_past 置为 True` 了,但是每个 task 之间的依赖更是没有问题。

1
2
3
4
5
6
default_args = {
'owner': 'amos',
'depends_on_past': True,
# 'start_date': airflow.utils.dates.days_ago(1),
'start_date': datetime.now(),
}

尝试着将 depends_on_past 置为 False ,Airflow 立马开始了属于他自己的奔跑(按照依赖关系奔跑)!

别出纰漏

顺便一提,还有一种可能会导致这种情况的发生

那就是没有开启这个 dag,也就是 trigger 的状态为 off,这样会把任务挂起,直至开启 dag 才会执行。


了解更多

AirFlow DAG Get stuck in running state