白名单数据同步

登录 tunnel:bin/odpscmd

  1. 下载数据:
tunnel download jst_biz_cdw_dev.dim_jst_erp_dataproduct_inventory_whitelist/bizhour=2023010311 data/dim_jst_erp_dataproduct_inventory_whitelist.txt;
  1. StarRocks 建表
CREATE TABLE IF NOT EXISTS dim_jst_erp_dataproduct_inventory_whitelist (
    co_id      BIGINT  ,
    co_name    STRING  ,
    version    INT     ,
    start_time DATETIME COMMENT '订阅开始时间',
    end_time   DATETIME COMMENT '订阅开始时间'
)
DUPLICATE KEY(co_id, co_name)
DISTRIBUTED BY HASH(co_id) BUCKETS 8;
  1. 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T dim_jst_erp_dataproduct_inventory_whitelist.txt -H "column_separator:	"  http://127.0.0.1:8030/api/jst_biz/dim_jst_erp_dataproduct_inventory_whitelist/_stream_load

库存日均 dt 同步

  1. 下载数据:
tunnel download -fd ^ jst_biz_cdw_dev.dws_jst_erp_whse_sku_inventory_statistics_dt data/dws_jst_erp_whse_sku_inventory_statistics_dt_20230116.txt;
  1. StarRocks 建表
CREATE TABLE IF NOT EXISTS dws_jst_erp_whse_sku_inventory_statistics_dt
(
    co_id              BIGINT        COMMENT '商家编号',
    sku_id             STRING        COMMENT '商品编码',
    pic                STRING        COMMENT '商品图片',
    sku_name           STRING        COMMENT '商品名称',
    properties_value   STRING        COMMENT '颜色规格',
    i_id               STRING        COMMENT '款式编码',
    sku_labels         STRING        COMMENT '商品标签',
    brand_name         STRING        COMMENT '商品品牌',
    category           STRING        COMMENT '商品分类',
    c_id               BIGINT        COMMENT '分类id',
    pay_avg_qty_3d     DECIMAL(38,4) COMMENT '近3天日均销售件数',
    pay_avg_qty_7d     DECIMAL(38,4) COMMENT '近7天日均销售件数',
    pay_avg_qty_15d    DECIMAL(38,4) COMMENT '近15天日均销售件数',
    pay_avg_qty_30d    DECIMAL(38,4) COMMENT '近30天日均销售件数',
    pay_avg_qty_90d    DECIMAL(38,4) COMMENT '近90天日均销售件数',
    return_avg_qty_3d  DECIMAL(38,4) COMMENT '近3天日均退货件数',
    return_avg_qty_7d  DECIMAL(38,4) COMMENT '近7天日均退货件数',
    return_avg_qty_15d DECIMAL(38,4) COMMENT '近15天日均退货件数',
    return_avg_qty_30d DECIMAL(38,4) COMMENT '近30天日均退货件数',
    update_time        DATETIME      COMMENT '更新时间',
    pt                 INT           COMMENT '分区'
)
DUPLICATE KEY(co_id, sku_id)
DISTRIBUTED BY HASH(co_id) BUCKETS 16;
  1. 通过脚本清洗换行符 python etl.py dws_jst_erp_whse_sku_inventory_statistics_dt_20230116.txt dws_jst_erp_whse_sku_inventory_statistics_dt_20230116_.txt
  2. 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T dws_jst_erp_whse_sku_inventory_statistics_dt_20230116_.txt -H "column_separator:^"  http://127.0.0.1:8030/api/jst_biz/dws_jst_erp_whse_sku_inventory_statistics_dt/_stream_load

未发货订单同步

  1. 下载数据:
tunnel download -fd ^ jst_biz_cdw_dev.dws_jst_erp_whse_order_sku_ri_test data/dws_jst_erp_whse_order_sku_ri_test.txt;
  1. StarRocks 建表
CREATE TABLE IF NOT EXISTS dws_jst_erp_whse_order_sku_ri
(
    co_id                 INT      ,
    o_id                  BIGINT   ,
    oi_id                 BIGINT   ,
    sku_id                STRING   ,
    skus                  STRING   ,
    sku_qty               BIGINT   ,
    sku_valid             INT      ,
    oi_pay_date           DATETIME,
    oi_send_date          DATETIME,
    oi_plan_delivery_date DATETIME,
    oi_status             STRING   ,
    oi_ts                 BIGINT   ,
    update_time           DATETIME
) PRIMARY KEY (co_id, o_id,oi_id,sku_id)
DISTRIBUTED BY HASH(co_id) BUCKETS 16
;
  1. 通过脚本合并文件、清洗换行符 cat dws_jst_erp_whse_order_sku_ri_test.*.txt > all_dws_jst_erp_whse_order_sku_ri_test.txt python etl.py all_dws_jst_erp_whse_order_sku_ri_test.txt all_dws_jst_erp_whse_order_sku_ri_test_.txt
  2. 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T all_dws_jst_erp_whse_order_sku_ri_test_.txt -H "column_separator:^" -H "columns: co_id, o_id, oi_id,skus,sku_id,sku_qty,sku_valid,oi_pay_date,oi_send_date,oi_plan_delivery_date,oi_status,oi_ts,update_time"  http://127.0.0.1:8030/api/jst_biz/dws_jst_erp_whse_order_sku_ri/_stream_load

库存数据初始化同步

  1. 下载数据:
tunnel download -fd ^ jst_biz_cdw_dev.dwd_jst_erp_whse_inventory_init_ht data/dwd_jst_erp_whse_inventory_init_ht.txt;
  1. StarRocks 建表
CREATE TABLE IF NOT EXISTS dws_jst_erp_whse_inventory_ri(
    co_id            BIGINT  COMMENT '商家编号',
    sku_id           STRING  COMMENT '商品编码',
    i_id             STRING  COMMENT '款式编码',
    sku_name         STRING  COMMENT '商品名称',
    properties_value STRING  COMMENT '颜色及规格',
    pic              STRING  COMMENT '图片路径',
    qty              BIGINT  COMMENT '数量',
    in_qty           BIGINT  COMMENT '进货仓库存',
    return_qty       BIGINT  COMMENT '销退仓库存',
    defective_qty    BIGINT  COMMENT '次品库存',
    order_lock       BIGINT  COMMENT '订单占有数',
    pick_lock        BIGINT  COMMENT '仓库待发数',
    min_qty          BIGINT  COMMENT '安全库存下限',
    max_qty          BIGINT  COMMENT '安全库存上限',
    virtual_qty      BIGINT  COMMENT '虚拟库存数',
    purchase_qty     BIGINT  COMMENT '采购在途数',
    min_day          BIGINT  COMMENT '安全天数下限',
    max_day          BIGINT  COMMENT '安全天数上限',
    allocate_qty     BIGINT  COMMENT '调拨在途数',
    customize_qty_1  BIGINT  COMMENT '自定义仓1库存',
    customize_qty_2  BIGINT  COMMENT '自定义仓2库存',
    customize_qty_3  BIGINT  COMMENT '自定义仓3库存',
    created          STRING  COMMENT '创建时间',
    modified         STRING  COMMENT '修改时间',
    ts               BIGINT  COMMENT '时间戳',
    db_id            INT     COMMENT '业务分库号',
    tb_id            INT     COMMENT '业务分表号',
    del_flag         TINYINT COMMENT '删除标志:0=正常单,1=业务删除,2=物理删除',
    stock            BIGINT  COMMENT '可用库存,根据商家配置表设置的逻辑来计算',
    update_time        DATETIME COMMENT '更新时间'
    ,pt                 INT COMMENT '分区'
)  PRIMARY KEY (co_id, sku_id)
DISTRIBUTED BY HASH(co_id) BUCKETS 16
;
  1. 通过脚本合并文件、清洗换行符 python etl.py dwd_jst_erp_whse_inventory_init_ht.txt dwd_jst_erp_whse_inventory_init_ht_.txt
  2. 通过 Stream Load 导入数据
curl --location-trusted -u root:jst@123456 -T dwd_jst_erp_whse_inventory_init_ht_.txt -H "column_separator:^"  http://127.0.0.1:8030/api/jst_biz/dws_jst_erp_whse_inventory_ri/_stream_load

性能测试

测试 SQL1

with qty_data as (
    select
       sku_id,
       pic ,
    sku_name ,
    properties_value ,
    i_id ,
    sku_labels ,
    brand_name ,
    c_id,
    category ,
    round(pay_avg_qty_3d,1) as pay_avg_qty_3d ,
    round(pay_avg_qty_7d,1) as pay_avg_qty_7d ,
    round(pay_avg_qty_15d,1) as pay_avg_qty_15d ,
    round(pay_avg_qty_30d,1) as pay_avg_qty_30d ,
    round(pay_avg_qty_90d,1) as pay_avg_qty_90d,
    return_avg_qty_3d ,
    return_avg_qty_7d ,
    return_avg_qty_15d ,
    return_avg_qty_30d ,
        qty - no_sent_qty as availableQty,
        qty as real_qty,
        case when (no_sent_qty>0 and no_sent_qty-(qty)>0 ) then no_sent_qty-(qty) else 0 end as stockout_qty,
        pay_avg_qty_30d as pay_avg_qty,
        case when pay_avg_qty_30d=0 then 0
   when (qty - no_sent_qty)<=0 then 0
else (qty - no_sent_qty)/pay_avg_qty_30d end as inventory_available_day,
        no_sent_qty,
        today_pay_qty,
       will_sent_qty_1d,
       will_sent_qty_1d_3d,
       will_sent_qty_3d_7d,
       will_sent_qty_7d,
       qty,
       virtual_qty,
       in_qty,
       purchase_qty,
       allocate_qty,
       return_qty

    from(
        select
COALESCE(a.sku_id,b.sku_id,c.sku_id) as sku_id ,
       c.pic ,
    c.sku_name ,
    c.properties_value ,
    c.i_id ,
    sku_labels ,
    brand_name ,
    c_id,
    category ,
    return_avg_qty_3d ,
    return_avg_qty_7d ,
    return_avg_qty_15d ,
    return_avg_qty_30d ,
       will_sent_qty_1d,
       will_sent_qty_1d_3d,
       will_sent_qty_3d_7d,
       will_sent_qty_7d,
            COALESCE(qty,0) AS qty,
            COALESCE(virtual_qty,0) AS virtual_qty,
            COALESCE(in_qty,0) AS in_qty,
            COALESCE(purchase_qty,0) AS purchase_qty,
            COALESCE(allocate_qty,0) AS allocate_qty,
            COALESCE(return_qty,0) AS return_qty,
            COALESCE(no_sent_qty,0) AS no_sent_qty,
            COALESCE(pay_avg_qty_90d,0) AS pay_avg_qty_90d,
            COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d,
            COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d,
            COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d,
            COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d,
            COALESCE(today_pay_qty,0) AS today_pay_qty
        from jst_biz.dws_jst_erp_whse_inventory_ri a
                FULL JOIN(
                    select
                        co_id,
                        sku_id,
                    sum(case when  oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT') THEN sku_qty else 0 end) as no_sent_qty,
                    sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 1 DAY) THEN sku_qty else 0 end) as will_sent_qty_1d,
                    sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date > date_add(current_timestamp(), INTERVAL 1 DAY) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 3 DAY) THEN sku_qty else 0 end) as will_sent_qty_1d_3d,
                    sum(case when  (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 3 DAY) and oi_plan_delivery_date <= date_add(current_timestamp(), INTERVAL 7 DAY) THEN sku_qty else 0 end) as will_sent_qty_3d_7d,
                    sum(case when  (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and  oi_plan_delivery_date > date_add(current_timestamp(), INTERVAL 7 DAY) THEN sku_qty else 0 end) as will_sent_qty_7d,
                    sum(case when  oi_pay_date>CURDATE() then sku_qty else 0 end) as today_pay_qty
                from jst_biz.dws_jst_erp_whse_order_sku_ri
            where co_id=10962266  and  sku_valid=1
            GROUP BY co_id,sku_id
        ) b on a.co_id=b.co_id  and a.sku_id=b.sku_id
                FULL JOIN jst_biz.dws_jst_erp_whse_sku_inventory_statistics_dt c on a.co_id=c.co_id and a.sku_id=c.sku_id
        where a.co_id=10962266 and c.co_id=10962266
    )t
)
select
    row_number() over(
ORDER BY
     inventory_available_day desc,
      real_qty desc,
  sku_id
) as "rank",
sku_id,
       pic ,
    sku_name ,
    properties_value ,
    i_id ,
    sku_labels ,
    brand_name ,
    c_id,
    category ,
    COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d ,
    COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d ,
    COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d ,
    COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d ,
    COALESCE(return_avg_qty_3d,0) AS return_avg_qty_3d ,
    COALESCE(return_avg_qty_7d,0) AS return_avg_qty_7d ,
    COALESCE(return_avg_qty_15d,0) AS return_avg_qty_15d ,
    COALESCE(return_avg_qty_30d,0) AS return_avg_qty_30d ,
         COALESCE(real_qty,0) AS real_qty,
         COALESCE(availableQty,0) AS availableQty,
        COALESCE(stockout_qty,0) AS stockout_qty,
        COALESCE(inventory_available_day,0) AS inventory_available_day ,
        COALESCE(no_sent_qty,0) AS no_sent_qty,
        COALESCE(today_pay_qty,0) AS today_pay_qty,
       COALESCE(will_sent_qty_1d,0) AS will_sent_qty_1d,
       COALESCE(will_sent_qty_1d_3d,0) AS will_sent_qty_1d_3d,
       COALESCE(will_sent_qty_3d_7d,0) AS will_sent_qty_3d_7d,
       COALESCE(will_sent_qty_7d,0) AS will_sent_qty_7d,
       COALESCE(qty,0) AS qty,
        COALESCE(virtual_qty,0) AS virtual_qty,
       COALESCE(in_qty,0) AS in_qty,
       COALESCE(purchase_qty,0) AS purchase_qty,
       COALESCE(allocate_qty,0) AS allocate_qty,
       COALESCE(return_qty,0) AS return_qty
   from qty_data where
     availableQty<0
    order by
"rank"
limit 100 offset 0;

starrocks 结果: 66 rows in set (0.11 sec)

hologresSQL

set optimizer_join_order = query;
-- explain ANALYSE
with qty_data as (
    select
       sku_id,
       pic ,
    sku_name ,
    properties_value ,
    i_id ,
    sku_labels::text ,
    brand_name ,
    c_id,
    category ,
    round(pay_avg_qty_3d,1) as pay_avg_qty_3d ,
    round(pay_avg_qty_7d,1) as pay_avg_qty_7d ,
    round(pay_avg_qty_15d,1) as pay_avg_qty_15d ,
    round(pay_avg_qty_30d,1) as pay_avg_qty_30d ,
    round(pay_avg_qty_90d,1) as pay_avg_qty_90d,
    return_avg_qty_3d ,
    return_avg_qty_7d ,
    return_avg_qty_15d ,
    return_avg_qty_30d ,
        main_qty - no_sent_qty as availableQty,
        main_qty as real_qty,
        case when (no_sent_qty>0 and no_sent_qty-(main_qty)>0 ) then no_sent_qty-(main_qty) else 0 end as stockout_qty,
        pay_avg_qty_30d as pay_avg_qty,
        case when pay_avg_qty_30d=0 then 0
   when (main_qty - no_sent_qty)<=0 then 0
else (main_qty - no_sent_qty)/pay_avg_qty_30d end as inventory_available_day,
        no_sent_qty,
        today_pay_qty,
       will_sent_qty_1d,
       will_sent_qty_1d_3d,
       will_sent_qty_3d_7d,
       will_sent_qty_7d,
       main_qty,
       virtual_qty,
       in_qty,
       purchase_qty,
       allocate_qty,
       return_qty

    from(
        select
COALESCE(a.sku_id,b.sku_id,c.sku_id) as sku_id ,
       pic ,
    sku_name ,
    properties_value ,
    i_id ,
    sku_labels ,
    brand_name ,
    c_id,
    category ,
    return_avg_qty_3d ,
    return_avg_qty_7d ,
    return_avg_qty_15d ,
    return_avg_qty_30d ,
       will_sent_qty_1d,
       will_sent_qty_1d_3d,
       will_sent_qty_3d_7d,
       will_sent_qty_7d,
            COALESCE(main_qty,0) AS main_qty,
            COALESCE(virtual_qty,0) AS virtual_qty,
            COALESCE(in_qty,0) AS in_qty,
            COALESCE(purchase_qty,0) AS purchase_qty,
            COALESCE(allocate_qty,0) AS allocate_qty,
            COALESCE(return_qty,0) AS return_qty,
            COALESCE(no_sent_qty,0) AS no_sent_qty,
            COALESCE(pay_avg_qty_90d,0) AS pay_avg_qty_90d,
            COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d,
            COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d,
            COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d,
            COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d,
            COALESCE(today_pay_qty,0) AS today_pay_qty
        from test.dws_jst_erp_whse_inventory_ri a
                FULL JOIN(
                    select
                        co_id,
                        pt,
                        sku_id,
                    sum(case when  oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT') THEN sku_qty else 0 end) as no_sent_qty,
                    sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= CURRENT_TIMESTAMP  + interval '1 day' THEN sku_qty else 0 end) as will_sent_qty_1d,
                    sum(case when (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date > CURRENT_TIMESTAMP  + interval '1 day' and oi_plan_delivery_date <= CURRENT_TIMESTAMP  + interval '3 day' THEN sku_qty else 0 end) as will_sent_qty_1d_3d,
                    sum(case when  (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and oi_plan_delivery_date <= CURRENT_TIMESTAMP  + interval '3 day'  and oi_plan_delivery_date <= CURRENT_TIMESTAMP  + interval '7 day' THEN sku_qty else 0 end) as will_sent_qty_3d_7d,
                    sum(case when  (oi_send_date is null or (oi_send_date is not null and oi_status != 'SENT')) and  oi_plan_delivery_date > CURRENT_TIMESTAMP  + interval '7 day' THEN sku_qty else 0 end) as will_sent_qty_7d,
                    sum(case when  oi_pay_date>CURRENT_DATE then sku_qty else 0 end) as today_pay_qty
                from test.dws_jst_erp_whse_order_sku_ri
            where co_id=10962266 and pt=10 and  sku_valid=1
            GROUP BY co_id,pt,sku_id
        ) b on a.co_id=b.co_id and a.pt=b.pt  and a.sku_id=b.sku_id
                FULL JOIN test.dws_jst_erp_whse_sku_inventory_statistics_dt c on a.co_id=c.co_id and a.pt=c.pt and a.sku_id=c.sku_id
        where a.co_id=10962266 and c.co_id=10962266  and a.pt=10 and c.pt=10
    )t
)
select
    row_number() over(
ORDER BY
     inventory_available_day desc,
      real_qty desc,
  sku_id
) as "rank",
sku_id,
       pic ,
    sku_name ,
    properties_value ,
    i_id ,
    sku_labels ,
    brand_name ,
    c_id,
    category ,
    COALESCE(pay_avg_qty_3d,0) AS pay_avg_qty_3d ,
    COALESCE(pay_avg_qty_7d,0) AS pay_avg_qty_7d ,
    COALESCE(pay_avg_qty_15d,0) AS pay_avg_qty_15d ,
    COALESCE(pay_avg_qty_30d,0) AS pay_avg_qty_30d ,
    COALESCE(return_avg_qty_3d,0) AS return_avg_qty_3d ,
    COALESCE(return_avg_qty_7d,0) AS return_avg_qty_7d ,
    COALESCE(return_avg_qty_15d,0) AS return_avg_qty_15d ,
    COALESCE(return_avg_qty_30d,0) AS return_avg_qty_30d ,
         COALESCE(real_qty,0) AS real_qty,
         COALESCE(availableQty,0) AS availableQty,
        COALESCE(stockout_qty,0) AS stockout_qty,
        COALESCE(inventory_available_day,0) AS inventory_available_day ,
        COALESCE(no_sent_qty,0) AS no_sent_qty,
        COALESCE(today_pay_qty,0) AS today_pay_qty,
       COALESCE(will_sent_qty_1d,0) AS will_sent_qty_1d,
       COALESCE(will_sent_qty_1d_3d,0) AS will_sent_qty_1d_3d,
       COALESCE(will_sent_qty_3d_7d,0) AS will_sent_qty_3d_7d,
       COALESCE(will_sent_qty_7d,0) AS will_sent_qty_7d,
       COALESCE(main_qty,0) AS main_qty,
        COALESCE(virtual_qty,0) AS virtual_qty,
       COALESCE(in_qty,0) AS in_qty,
       COALESCE(purchase_qty,0) AS purchase_qty,
       COALESCE(allocate_qty,0) AS allocate_qty,
       COALESCE(return_qty,0) AS return_qty
   from qty_data where
     availableQty<0
    order by
"rank"
limit 100 offset 0

Hologres 结果 执行成功,查询结果:[64]行,当前返回:[64]行,耗时:[293]ms.