Skip to content

Commit a9ee01c

Browse files
committed
flink demo
1 parent a71aa23 commit a9ee01c

File tree

8 files changed

+452
-0
lines changed

8 files changed

+452
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ tmp
1414
*.iml
1515
*.swp
1616
*.jar
17+
!flink-demo-udf.jar
1718
*.zip
1819
*.log
1920
*.pyc

flink-demo/README.md

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
2+
## 1. Flink standalone 环境准备(基于 Flink 1.11.1 版本)
3+
4+
(a) 下载 flink 安装包
5+
* Flink 安装包: https:://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
6+
7+
(b) 下载 JDBC connector 相关 jar
8+
* JDBC connector jar: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.1/flink-connector-jdbc_2.11-1.11.1.jar
9+
* MySQL driver jar: https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
10+
* PostgreSQL driver jar: https://jdbc.postgresql.org/download/postgresql-42.2.14.jar
11+
12+
(c) 下载 Kafak connector 相关 jar
13+
* Kafak connector jar: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.11.1/flink-sql-connector-kafka_2.11-1.11.1.jar
14+
15+
## 2. 测试数据准备
16+
17+
(a) kafka中的 changelog 数据是通过 debezium connector 抓取的 MySQL orders表 的binlog
18+
19+
(1) 拉起 docker 容器:
20+
docker-compose -f docker-compose-flink-demo.yaml up
21+
(2) 拉起 debezium connector, 开始抓取changelog并吐到kafka:
22+
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
23+
(3) 查看 Kafka 中的changelog:
24+
docker-compose -f docker-compose-flink-demo.yaml exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic dbserver1.inventory.orders
25+
26+
(b) 维表数据包含一张 customers 表和一张 products 表。
27+
28+
(1)查看维表中数据:
29+
docker-compose -f docker-compose-flink-demo.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
30+
31+
32+
## 3. 通过 SQL Client 编写 SQL作业
33+
./bin/start-cluster.sh
34+
./bin/sql-client.sh embedded -j flink-demo-udf.jar
35+
(a) 创建 Flink 表, 创建 Function
36+
```
37+
-- 订单表:
38+
CREATE TABLE orders (
39+
order_number INT NOT NULL,
40+
order_date INT NOT NULL,
41+
purchaser INT NOT NULL,
42+
quantity INT NOT NULL,
43+
product_id INT NOT NULL,
44+
proc_time as PROCTIME()
45+
) WITH (
46+
'connector' = 'kafka',
47+
'topic' = 'dbserver1.inventory.orders',
48+
'properties.bootstrap.servers' = 'localhost:9092',
49+
'scan.startup.mode' = 'earliest-offset',
50+
'format' = 'debezium-json',
51+
'debezium-json.schema-include' = 'true'
52+
);
53+
-- 维表,用户表:
54+
CREATE TABLE `customers` (
55+
`id` int NOT NULL,
56+
`first_name` varchar(255) NOT NULL,
57+
`last_name` varchar(255) NOT NULL,
58+
`email` varchar(255) NOT NULL
59+
) with (
60+
'connector' = 'jdbc',
61+
'url' = 'jdbc:mysql://localhost:3306/inventory',
62+
'driver' = 'com.mysql.jdbc.Driver',
63+
'username' = 'mysqluser',
64+
'password' = 'mysqlpw',
65+
'table-name' = 'customers',
66+
'lookup.cache.max-rows' = '100',
67+
'lookup.cache.ttl' = '10s',
68+
'lookup.max-retries' = '3'
69+
);
70+
71+
-- 维表,产品表
72+
CREATE TABLE `products` (
73+
`id` int NOT NULL,
74+
`name` varchar(255)NOT NULL,
75+
`description` varchar(512) NOT NULL,
76+
`weight` float NOT NULL
77+
) with (
78+
'connector' = 'jdbc',
79+
'driver' = 'com.mysql.jdbc.Driver',
80+
'url' = 'jdbc:mysql://localhost:3306/inventory',
81+
'username' = 'mysqluser',
82+
'password' = 'mysqlpw',
83+
'table-name' = 'products',
84+
'lookup.cache.max-rows' = '100',
85+
'lookup.cache.ttl' = '10s',
86+
'lookup.max-retries' = '3'
87+
);
88+
89+
-- UDF,把 INT(day from epoch) 转为Date
90+
create function int2Date as 'udf.Int2DateUDF';
91+
```
92+
93+
(b) 结果表从 pg catalog 获取,不用创建
94+
```
95+
CREATE CATALOG mypg WITH (
96+
'type'='jdbc',
97+
'property-version'='1',
98+
'base-url'='jdbc:postgresql://localhost:5432/',
99+
'default-database'='postgres',
100+
'username'='postgres',
101+
'password'='postgres'
102+
);
103+
```
104+
105+
```
106+
docker-compose -f docker-compose-flink-demo.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
107+
create table enrich_orders(
108+
order_number integer PRIMARY KEY,
109+
order_date date NOT NULL,
110+
purchaser_name varchar(255) NOT NULL,
111+
purchaser_email varchar(255) NOT NULL,
112+
quantity integer NOT NULL,
113+
product_name varchar(255) NOT NULL,
114+
product_weight DECIMAL(10, 4) NOT NULL,
115+
total_weight DECIMAL(10, 4) NOT NULL
116+
);
117+
```
118+
(d) 提交作业到集群
119+
```
120+
insert into mypg.postgres.`inventory.enrich_orders`
121+
select
122+
order_number,
123+
int2Date(order_date),
124+
concat(c.first_name, ' ', c.last_name) as purchaser_name,
125+
c.email,
126+
quantity,
127+
p.name,
128+
p.weight,
129+
p.weight * quantity as total_weight
130+
from orders as o
131+
join customers FOR SYSTEM_TIME AS OF o.proc_time c on o.purchaser = c.id
132+
join products FOR SYSTEM_TIME AS OF o.proc_time p on o.product_id = p.id;
133+
```
134+
(e) 查看 enrich_orders 表已经有 etl 后的数据
135+
136+
docker-compose -f docker-compose-flink-demo.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
137+
138+
## 4. 测试 CDC 数据同步 和 维表join
139+
140+
(a) 新增订单 & 修改订单
141+
142+
(1) docker-compose -f docker-compose-flink-demo.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
143+
insert into orders (order_date, purchaser, quantity, product_id) values ('2020-07-30', 1001, 5, 108);
144+
update orders set quantity = 50 where order_number = 10005;
145+
update orders set quantity = quantity - 1;
146+
update orders set quantity = quantity + 11;
147+
(2)查看 pg 结果表中的数据是否同步更新:
148+
docker-compose -f docker-compose-flink-demo.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
149+
150+
(b) 维表数据更新
151+
152+
(1) 更新用户的邮件地址
153+
update customers set email = '[email protected]' where id = 1001;
154+
(2) 用户再次下单:
155+
insert into orders (order_date, purchaser, quantity, product_id) values ('2020-07-30', 1001, 5, 106);
156+
(3) 查看新订单的地址:
157+
docker-compose -f docker-compose-flink-demo.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
158+
159+
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
version: '2'
2+
services:
3+
zookeeper:
4+
image: wurstmeister/zookeeper:3.4.6
5+
ports:
6+
- "2181:2181"
7+
kafka:
8+
image: wurstmeister/kafka:2.12-2.2.1
9+
ports:
10+
- "9092:9092"
11+
- "9094:9094"
12+
depends_on:
13+
- zookeeper
14+
environment:
15+
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
16+
- KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
17+
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
18+
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
19+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
20+
volumes:
21+
- /var/run/docker.sock:/var/run/docker.sock
22+
mysql:
23+
image: debezium/example-mysql:1.1
24+
ports:
25+
- 3306:3306
26+
environment:
27+
- MYSQL_ROOT_PASSWORD=debezium
28+
- MYSQL_USER=mysqluser
29+
- MYSQL_PASSWORD=mysqlpw
30+
postgres:
31+
image: debezium/example-postgres:1.1
32+
ports:
33+
- 5432:5432
34+
environment:
35+
- POSTGRES_USER=postgres
36+
- POSTGRES_PASSWORD=postgres
37+
connect:
38+
image: debezium/connect:1.1
39+
ports:
40+
- 8083:8083
41+
depends_on:
42+
- kafka
43+
- mysql
44+
environment:
45+
- BOOTSTRAP_SERVERS=kafka:9094
46+
- GROUP_ID=1
47+
- CONFIG_STORAGE_TOPIC=my_connect_configs
48+
- OFFSET_STORAGE_TOPIC=my_connect_offsets
49+
- STATUS_STORAGE_TOPIC=my_connect_statuses

flink-demo/flink-demo-udf.jar

2.29 KB
Binary file not shown.

flink-demo/pom.xml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>flink-sql-etl</artifactId>
7+
<groupId>org.example</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>flink-demo</artifactId>
13+
<dependencies>
14+
<dependency>
15+
<groupId>org.apache.flink</groupId>
16+
<artifactId>flink-table-common</artifactId>
17+
<version>${flink.version}</version>
18+
<scope>provided</scope>
19+
</dependency>
20+
</dependencies>
21+
22+
<build>
23+
<plugins>
24+
<plugin>
25+
<groupId>org.apache.maven.plugins</groupId>
26+
<artifactId>maven-jar-plugin</artifactId>
27+
<version>3.0.0</version>
28+
<configuration>
29+
<finalName>flink-demo-udf</finalName>
30+
</configuration>
31+
</plugin>
32+
</plugins>
33+
</build>
34+
35+
</project>

flink-demo/register-mysql.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"name": "mysql-inventory-connector",
3+
"config": {
4+
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
5+
"tasks.max": "1",
6+
"database.hostname": "mysql",
7+
"database.port": "3306",
8+
"database.user": "debezium",
9+
"database.password": "dbz",
10+
"database.server.id": "184054",
11+
"database.server.name": "dbserver1",
12+
"database.whitelist": "inventory",
13+
"database.history.kafka.bootstrap.servers": "kafka:9094",
14+
"database.history.kafka.topic": "schema-changes.inventory"
15+
}
16+
}

0 commit comments

Comments
 (0)