3.odps到StarRocks
白名单数据同步
登录 tunnel:bin/odpscmd
- 下载数据:
tunnel download jst_biz_cdw_dev.dim_jst_erp_dataproduct_inventory_whitelist/bizhour=2023010311 data/dim_jst_erp_dataproduct_inventory_whitelist.txt;
- StarRocks 建表
CREATE TABLE IF NOT EXISTS dim_jst_erp_dataproduct_inventory_whitelist (
co_id BIGINT ,
co_name STRING ,
version INT ,
start_time DATETIME COMMENT '订阅开始时间',
end_time DATETIME COMMENT '订阅开始时间'
)
DUPLICATE KEY(co_id, co_name)
DISTRIBUTED BY HASH(co_id) BUCKETS 8;
- 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T dim_jst_erp_dataproduct_inventory_whitelist.txt -H "column_separator: " http://127.0.0.1:8030/api/jst_biz/dim_jst_erp_dataproduct_inventory_whitelist/_stream_load
库存日均 dt 同步
- 下载数据:
tunnel download -fd ^ jst_biz_cdw_dev.dws_jst_erp_whse_sku_inventory_statistics_dt data/dws_jst_erp_whse_sku_inventory_statistics_dt_20230116.txt;
- StarRocks 建表
CREATE TABLE IF NOT EXISTS dws_jst_erp_whse_sku_inventory_statistics_dt
(
co_id BIGINT COMMENT '商家编号',
sku_id STRING COMMENT '商品编码',
pic STRING COMMENT '商品图片',
sku_name STRING COMMENT '商品名称',
properties_value STRING COMMENT '颜色规格',
i_id STRING COMMENT '款式编码',
sku_labels STRING COMMENT '商品标签',
brand_name STRING COMMENT '商品品牌',
category STRING COMMENT '商品分类',
c_id BIGINT COMMENT '分类id',
pay_avg_qty_3d DECIMAL(38,4) COMMENT '近3天日均销售件数',
pay_avg_qty_7d DECIMAL(38,4) COMMENT '近7天日均销售件数',
pay_avg_qty_15d DECIMAL(38,4) COMMENT '近15天日均销售件数',
pay_avg_qty_30d DECIMAL(38,4) COMMENT '近30天日均销售件数',
pay_avg_qty_90d DECIMAL(38,4) COMMENT '近90天日均销售件数',
return_avg_qty_3d DECIMAL(38,4) COMMENT '近3天日均退货件数',
return_avg_qty_7d DECIMAL(38,4) COMMENT '近7天日均退货件数',
return_avg_qty_15d DECIMAL(38,4) COMMENT '近15天日均退货件数',
return_avg_qty_30d DECIMAL(38,4) COMMENT '近30天日均退货件数',
update_time DATETIME COMMENT '更新时间',
pt INT COMMENT '分区'
)
DUPLICATE KEY(co_id, sku_id)
DISTRIBUTED BY HASH(co_id) BUCKETS 16;
- 通过脚本清洗换行符
python etl.py dws_jst_erp_whse_sku_inventory_statistics_dt_20230116.txt dws_jst_erp_whse_sku_inventory_statistics_dt_20230116_.txt - 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T dws_jst_erp_whse_sku_inventory_statistics_dt_20230116_.txt -H "column_separator:^" http://127.0.0.1:8030/api/jst_biz/dws_jst_erp_whse_sku_inventory_statistics_dt/_stream_load
未发货订单同步
- 下载数据:
tunnel download -fd ^ jst_biz_cdw_dev.dws_jst_erp_whse_order_sku_ri_test data/dws_jst_erp_whse_order_sku_ri_test.txt;
- StarRocks 建表
CREATE TABLE IF NOT EXISTS dws_jst_erp_whse_order_sku_ri
(
co_id INT ,
o_id BIGINT ,
oi_id BIGINT ,
sku_id STRING ,
skus STRING ,
sku_qty BIGINT ,
sku_valid INT ,
oi_pay_date DATETIME,
oi_send_date DATETIME,
oi_plan_delivery_date DATETIME,
oi_status STRING ,
oi_ts BIGINT ,
update_time DATETIME
) PRIMARY KEY (co_id, o_id,oi_id,sku_id)
DISTRIBUTED BY HASH(co_id) BUCKETS 16
;
- 通过脚本合并文件、清洗换行符
cat dws_jst_erp_whse_order_sku_ri_test.*.txt > all_dws_jst_erp_whse_order_sku_ri_test.txtpython etl.py all_dws_jst_erp_whse_order_sku_ri_test.txt all_dws_jst_erp_whse_order_sku_ri_test_.txt - 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T all_dws_jst_erp_whse_order_sku_ri_test_.txt -H "column_separator:^" -H "columns: co_id, o_id, oi_id,skus,sku_id,sku_qty,sku_valid,oi_pay_date,oi_send_date,oi_plan_delivery_date,oi_status,oi_ts,update_time" http://127.0.0.1:8030/api/jst_biz/dws_jst_erp_whse_order_sku_ri/_stream_load
库存数据初始化同步
- 下载数据:
tunnel download -fd ^ jst_biz_cdw_dev.dwd_jst_erp_whse_inventory_init_ht data/dwd_jst_erp_whse_inventory_init_ht.txt;
- StarRocks 建表
CREATE TABLE IF NOT EXISTS dws_jst_erp_whse_inventory_ri(
co_id BIGINT COMMENT '商家编号',
sku_id STRING COMMENT '商品编码',
i_id STRING COMMENT '款式编码',
sku_name STRING COMMENT '商品名称',
properties_value STRING COMMENT '颜色及规格',
pic STRING COMMENT '图片路径',
qty BIGINT COMMENT '数量',
in_qty BIGINT COMMENT '进货仓库存',
return_qty BIGINT COMMENT '销退仓库存',
defective_qty BIGINT COMMENT '次品库存',
order_lock BIGINT COMMENT '订单占有数',
pick_lock BIGINT COMMENT '仓库待发数',
min_qty BIGINT COMMENT '安全库存下限',
max_qty BIGINT COMMENT '安全库存上限',
virtual_qty BIGINT COMMENT '虚拟库存数',
purchase_qty BIGINT COMMENT '采购在途数',
min_day BIGINT COMMENT '安全天数下限',
max_day BIGINT COMMENT '安全天数上限',
allocate_qty BIGINT COMMENT '调拨在途数',
customize_qty_1 BIGINT COMMENT '自定义仓1库存',
customize_qty_2 BIGINT COMMENT '自定义仓2库存',
customize_qty_3 BIGINT COMMENT '自定义仓3库存',
created STRING COMMENT '创建时间',
modified STRING COMMENT '修改时间',
ts BIGINT COMMENT '时间戳',
db_id INT COMMENT '业务分库号',
tb_id INT COMMENT '业务分表号',
del_flag TINYINT COMMENT '删除标志:0=正常单,1=业务删除,2=物理删除',
stock BIGINT COMMENT '可用库存,根据商家配置表设置的逻辑来计算',
update_time DATETIME COMMENT '更新时间'
,pt INT COMMENT '分区'
) PRIMARY KEY (co_id, sku_id)
DISTRIBUTED BY HASH(co_id) BUCKETS 16
;
- 通过脚本合并文件、清洗换行符
python etl.py dwd_jst_erp_whse_inventory_init_ht.txt dwd_jst_erp_whse_inventory_init_ht_.txt - 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T dwd_jst_erp_whse_inventory_init_ht_.txt -H "column_separator:^" http://127.0.0.1:8030/api/jst_biz/dws_jst_erp_whse_inventory_ri/_stream_load
性能测试
测试 SQL1
with qty_data as (
select
sku_id,
pic ,
sku_name ,
properties_value ,
i_id ,
sku_labels ,
brand_name ,
c_id,
category ,
round(pay_avg_qty_3d,1) as pay_avg_qty_3d ,
round(pay_avg_qty_7d,1) as pay_avg_qty_7d ,
round(pay_avg_qty_15d,1) as pay_avg_qty_15d ,
round(pay_avg_qty_30d,1) as pay_avg_qty_30d ,
round(pay_avg_qty_90d,1) as pay_avg_qty_90d,
return_avg_qty_3d ,
return_avg_qty_7d ,
return_avg_qty_15d ,
return_avg_qty_30d ,
qty - no_sent_qty as availableQty,
qty as real_qty,
case when (no_sent_qty>0 and no_sent_qty-(qty)>0 ) then no_sent_qty-(qty) else 0 end as stockout_qty,
pay_avg_qty_30d as pay_avg_qty,
case when pay_avg_qty_30d=0 then 0
when (qty - no_sent_qty)<=0 then 0
else (qty - no_sent_qty)/pay_avg_qty_30d end as inventory_available_day,
no_sent_qty,
today_pay_qty,
will_sent_qty_1d,
will_sent_qty_1d_3d,
will_sent_qty_3d_7d,
will_sent_qty_7d,
qty,
virtual_qty,
in_qty,
purchase_qty,
allocate_qty,
return_qty
from(
select
COALESCE(a.sku_id,b.sku_id,c.sku_id) as sku_id ,
c.pic ,
c.sku_name ,
c.properties_value ,
c.i_id ,
sku_labels ,
brand_name ,
c_id,
category ,
return_avg_qty_3d ,
return_avg_qty_7d ,
return_avg_qty_15d ,
return_avg_qty_30d ,
will_sent_qty_1d,
will_sent_qty_1d_3d,
will_sent_qty_3d_7d,
will_sent_qty_7d,
COALESCE(qty,0) AS qty,
COALESCE(virtual_qty,0) AS virtual_qty,
COALESCE(in_qty,0) AS in_qty,
COALESCE(purchase_qty,0) AS purchase_qty,
COALESCE(allocate_qty,0) AS allocate_qty,
COALESCE(return_qty,0) AS return_qty,
COALESCE(no_sent_qty,0) AS no_sent_qty,
COALESCE(pay_avg_qty_90d,0) AS pay_avg_qty_90d,
COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d,
COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d,
COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d,
COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d,
COALESCE(today_pay_qty,0) AS today_pay_qty
from jst_biz.dws_jst_erp_whse_inventory_ri a
FULL JOIN(
select
co_id,
sku_id,
sum(case when oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT') THEN sku_qty else 0 end) as no_sent_qty,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 1 DAY) THEN sku_qty else 0 end) as will_sent_qty_1d,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date > date_add(current_timestamp(), INTERVAL 1 DAY) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 3 DAY) THEN sku_qty else 0 end) as will_sent_qty_1d_3d,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 3 DAY) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 7 DAY) THEN sku_qty else 0 end) as will_sent_qty_3d_7d,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date > date_add(current_timestamp(), INTERVAL 7 DAY) THEN sku_qty else 0 end) as will_sent_qty_7d,
sum(case when oi_pay_date>CURDATE() then sku_qty else 0 end) as today_pay_qty
from jst_biz.dws_jst_erp_whse_order_sku_ri
where co_id=10962266 and sku_valid=1
GROUP BY co_id,sku_id
) b on a.co_id=b.co_id and a.sku_id=b.sku_id
FULL JOIN jst_biz.dws_jst_erp_whse_sku_inventory_statistics_dt c on a.co_id=c.co_id and a.sku_id=c.sku_id
where a.co_id=10962266 and c.co_id=10962266
)t
)
select
row_number() over(
ORDER BY
inventory_available_day desc,
real_qty desc,
sku_id
) as "rank",
sku_id,
pic ,
sku_name ,
properties_value ,
i_id ,
sku_labels ,
brand_name ,
c_id,
category ,
COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d ,
COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d ,
COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d ,
COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d ,
COALESCE(return_avg_qty_3d,0) AS return_avg_qty_3d ,
COALESCE(return_avg_qty_7d,0) AS return_avg_qty_7d ,
COALESCE(return_avg_qty_15d,0) AS return_avg_qty_15d ,
COALESCE(return_avg_qty_30d,0) AS return_avg_qty_30d ,
COALESCE(real_qty,0) AS real_qty,
COALESCE(availableQty,0) AS availableQty,
COALESCE(stockout_qty,0) AS stockout_qty,
COALESCE(inventory_available_day,0) AS inventory_available_day ,
COALESCE(no_sent_qty,0) AS no_sent_qty,
COALESCE(today_pay_qty,0) AS today_pay_qty,
COALESCE(will_sent_qty_1d,0) AS will_sent_qty_1d,
COALESCE(will_sent_qty_1d_3d,0) AS will_sent_qty_1d_3d,
COALESCE(will_sent_qty_3d_7d,0) AS will_sent_qty_3d_7d,
COALESCE(will_sent_qty_7d,0) AS will_sent_qty_7d,
COALESCE(qty,0) AS qty,
COALESCE(virtual_qty,0) AS virtual_qty,
COALESCE(in_qty,0) AS in_qty,
COALESCE(purchase_qty,0) AS purchase_qty,
COALESCE(allocate_qty,0) AS allocate_qty,
COALESCE(return_qty,0) AS return_qty
from qty_data where
availableQty<0
order by
"rank"
limit 100 offset 0;
starrocks 结果: 66 rows in set (0.11 sec)
hologresSQL
set optimizer_join_order = query;
-- explain ANALYSE
with qty_data as (
select
sku_id,
pic ,
sku_name ,
properties_value ,
i_id ,
sku_labels::text ,
brand_name ,
c_id,
category ,
round(pay_avg_qty_3d,1) as pay_avg_qty_3d ,
round(pay_avg_qty_7d,1) as pay_avg_qty_7d ,
round(pay_avg_qty_15d,1) as pay_avg_qty_15d ,
round(pay_avg_qty_30d,1) as pay_avg_qty_30d ,
round(pay_avg_qty_90d,1) as pay_avg_qty_90d,
return_avg_qty_3d ,
return_avg_qty_7d ,
return_avg_qty_15d ,
return_avg_qty_30d ,
main_qty - no_sent_qty as availableQty,
main_qty as real_qty,
case when (no_sent_qty>0 and no_sent_qty-(main_qty)>0 ) then no_sent_qty-(main_qty) else 0 end as stockout_qty,
pay_avg_qty_30d as pay_avg_qty,
case when pay_avg_qty_30d=0 then 0
when (main_qty - no_sent_qty)<=0 then 0
else (main_qty - no_sent_qty)/pay_avg_qty_30d end as inventory_available_day,
no_sent_qty,
today_pay_qty,
will_sent_qty_1d,
will_sent_qty_1d_3d,
will_sent_qty_3d_7d,
will_sent_qty_7d,
main_qty,
virtual_qty,
in_qty,
purchase_qty,
allocate_qty,
return_qty
from(
select
COALESCE(a.sku_id,b.sku_id,c.sku_id) as sku_id ,
pic ,
sku_name ,
properties_value ,
i_id ,
sku_labels ,
brand_name ,
c_id,
category ,
return_avg_qty_3d ,
return_avg_qty_7d ,
return_avg_qty_15d ,
return_avg_qty_30d ,
will_sent_qty_1d,
will_sent_qty_1d_3d,
will_sent_qty_3d_7d,
will_sent_qty_7d,
COALESCE(main_qty,0) AS main_qty,
COALESCE(virtual_qty,0) AS virtual_qty,
COALESCE(in_qty,0) AS in_qty,
COALESCE(purchase_qty,0) AS purchase_qty,
COALESCE(allocate_qty,0) AS allocate_qty,
COALESCE(return_qty,0) AS return_qty,
COALESCE(no_sent_qty,0) AS no_sent_qty,
COALESCE(pay_avg_qty_90d,0) AS pay_avg_qty_90d,
COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d,
COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d,
COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d,
COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d,
COALESCE(today_pay_qty,0) AS today_pay_qty
from test.dws_jst_erp_whse_inventory_ri a
FULL JOIN(
select
co_id,
pt,
sku_id,
sum(case when oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT') THEN sku_qty else 0 end) as no_sent_qty,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= CURRENT_TIMESTAMP + interval '1 day' THEN sku_qty else 0 end) as will_sent_qty_1d,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date > CURRENT_TIMESTAMP + interval '1 day' and oi_plan_delivery_date <= CURRENT_TIMESTAMP + interval '3 day' THEN sku_qty else 0 end) as will_sent_qty_1d_3d,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= CURRENT_TIMESTAMP + interval '3 day' and oi_plan_delivery_date <= CURRENT_TIMESTAMP + interval '7 day' THEN sku_qty else 0 end) as will_sent_qty_3d_7d,
sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date > CURRENT_TIMESTAMP + interval '7 day' THEN sku_qty else 0 end) as will_sent_qty_7d,
sum(case when oi_pay_date>CURRENT_DATE then sku_qty else 0 end) as today_pay_qty
from test.dws_jst_erp_whse_order_sku_ri
where co_id=10962266 and pt=10 and sku_valid=1
GROUP BY co_id,pt,sku_id
) b on a.co_id=b.co_id and a.pt=b.pt and a.sku_id=b.sku_id
FULL JOIN test.dws_jst_erp_whse_sku_inventory_statistics_dt c on a.co_id=c.co_id and a.pt=c.pt and a.sku_id=c.sku_id
where a.co_id=10962266 and c.co_id=10962266 and a.pt=10 and c.pt=10
)t
)
select
row_number() over(
ORDER BY
inventory_available_day desc,
real_qty desc,
sku_id
) as "rank",
sku_id,
pic ,
sku_name ,
properties_value ,
i_id ,
sku_labels ,
brand_name ,
c_id,
category ,
COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d ,
COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d ,
COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d ,
COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d ,
COALESCE(return_avg_qty_3d,0) AS return_avg_qty_3d ,
COALESCE(return_avg_qty_7d,0) AS return_avg_qty_7d ,
COALESCE(return_avg_qty_15d,0) AS return_avg_qty_15d ,
COALESCE(return_avg_qty_30d,0) AS return_avg_qty_30d ,
COALESCE(real_qty,0) AS real_qty,
COALESCE(availableQty,0) AS availableQty,
COALESCE(stockout_qty,0) AS stockout_qty,
COALESCE(inventory_available_day,0) AS inventory_available_day ,
COALESCE(no_sent_qty,0) AS no_sent_qty,
COALESCE(today_pay_qty,0) AS today_pay_qty,
COALESCE(will_sent_qty_1d,0) AS will_sent_qty_1d,
COALESCE(will_sent_qty_1d_3d,0) AS will_sent_qty_1d_3d,
COALESCE(will_sent_qty_3d_7d,0) AS will_sent_qty_3d_7d,
COALESCE(will_sent_qty_7d,0) AS will_sent_qty_7d,
COALESCE(main_qty,0) AS main_qty,
COALESCE(virtual_qty,0) AS virtual_qty,
COALESCE(in_qty,0) AS in_qty,
COALESCE(purchase_qty,0) AS purchase_qty,
COALESCE(allocate_qty,0) AS allocate_qty,
COALESCE(return_qty,0) AS return_qty
from qty_data where
availableQty<0
order by
"rank"
limit 100 offset 0
Hologres 结果 执行成功,查询结果:[64]行,当前返回:[64]行,耗时:[293]ms.
