数仓开发之DWD层(一)

article/2025/9/14 4:32:12

目录

 一:流量域未经加工的事务事实表

1.1 主要任务

1.2 思路

1.3 图解

 1.4 代码

二:流量域独立访客事务事实表

2.1 主要任务

2.2 思路分析

 2.3 图解

 2.4 代码


DWD层设计要点:

(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。

(2)DWD层表名的命名规范为dwd_数据域_表名

 一:流量域未经加工的事务事实表

1.1 主要任务

1)数据清洗(ETL)

数据传输过程中可能会出现部分数据丢失的情况,导致 JSON 数据结构不再完整,因此需要对脏数据进行过滤。

2)新老访客状态标记修复

日志数据 common 字段下的 is_new 字段是用来标记新老访客状态的,1 表示新访客,0 表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。

3)分流

本节将通过分流对日志数据进行拆分,生成五张事务事实表写入 Kafka

    • 流量域页面浏览事务事实表
    • 流量域启动事务事实表
    • 流量域动作事务事实表
    • 流量域曝光事务事实表
    • 流量域错误事务事实表

1.2 思路

1)数据清洗(ETL)

对流中数据进行解析,将字符串转换为 JSONObject,如果解析报错则必然为脏数据。

定义侧输出流,将脏数据发送到侧输出流,写入 Kafka 脏数据主题。

2)新老访客状态标记修复

(1)前端埋点新老访客状态标记设置规则

以神策提供的第三方埋点服务中新老访客状态标记设置规则为例

  • Web 端:用户第一次访问埋入神策 SDK 页面的当天(即第一天),JS SDK 会在网页的 cookie 中设置一个首日访问的标记,并设置第一天 24 点之前,该标记为 true,即第一天触发的网页端所有事件中,is_new = 1。第一天之后,该标记则为 false,即第一天之后触发的网页端所有事件中,is_new = 0
  • 小程序端:用户第一天访问埋入神策 SDK 的页面时,小程序 SDK 会在 storage 缓存中创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。即第一天触发的小程序端所有事件中,is_new = 1。第一天之后,该标记则为 false,即第一天之后触发的小程序端所有事件中,is_new = 0
  • APP 端:用户安装 App 后,第一次打开埋入神策 SDK 的 App 的当天,Android/iOS SDK 会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。即第一天触发的 APP 端所有事件中,is_new = 1。第一天之后,该标记则为 false,即第一天之后触发的 APP 端所有事件中,is_new = 0

本项目模拟生成的是 APP 端日志数据。对于此类日志,如果首日之后用户清除了手机本地缓存中的标记,再次启动 APP 会重新设置一个首日为 true 的标记,导致本应为 0 的 is_new 字段被置为 1,可能会给相关指标带来误差。因此,有必要对新老访客状态标记进行修复。

(2)新老访客状态标记修复思路

运用 Flink 状态编程,为每个 mid 维护一个键控状态,记录首次访问日期。

①如果 is_new 的值为 1

a)如果键控状态为 null,认为本次是该访客首次访问 APP,将日志中 ts 对应的日期更新到状态中,不对 is_new 字段做修改;

b)如果键控状态不为 null,且首次访问日期不是当日,说明访问的是老访客,将 is_new 字段置为 0

c)如果键控状态不为 null,且首次访问日期是当日,说明访问的是新访客,不做操作;

②如果 is_new 的值为 0

a)如果键控状态为 null,说明访问 APP 的是老访客但本次是该访客的页面日志首次进入程序。当前端新老访客状态标记丢失时,日志进入程序被判定为老访客,Flink 程序就可以纠正被误判的访客状态标记,只要将状态中的日期设置为今天之前即可。本程序选择将状态更新为昨日;

b)如果键控状态不为 null,说明程序已经维护了首次访问日期,不做操作。

3)利用侧输出流实现数据拆分

(1)埋点日志结构分析

前端埋点获取的 JSON 字符串(日志)可能存在 common、start、page、displays、actions、err、ts 七种字段。其中

  • common 对应的是公共信息,是所有日志都有的字段
  • err 对应的是错误信息,所有日志都可能有的字段
  • start 对应的是启动信息,启动日志才有的字段
  • page 对应的是页面信息,页面日志才有的字段
  • displays 对应的是曝光信息,曝光日志才有的字段,曝光日志可以归为页面日志,因此必然有 page 字段
  • actions 对应的是动作信息,动作日志才有的字段,同样属于页面日志,必然有 page 字段。动作信息和曝光信息可以同时存在。
  • ts 对应的是时间戳,单位:毫秒,所有日志都有的字段

综上,我们可以将前端埋点获取的日志分为两大类:启动日志和页面日志。二者都有 common 字段和 ts 字段,都可能有 err 字段。页面日志一定有 page 字段,一定没有 start 字段,可能有 displays 和 actions 字段;启动日志一定有 start 字段,一定没有 page、displays 和 actions 字段。

(2)分流日志分类

本节将按照内容,将日志分为以下五类

  • 启动日志
  • 页面日志
  • 曝光日志
  • 动作日志
  • 错误日志

(3)分流思路

①所有日志数据都可能拥有 err 字段,所有首先获取 err 字段,如果返回值不为 null 则将整条日志数据发送到错误侧输出流。然后删掉 JSONObject 中的 err 字段及对应值;

②判断是否有 start 字段,如果有则说明数据为启动日志,将其发送到启动侧输出流;如果没有则说明为页面日志,进行下一步;

③页面日志必然有 page 字段、 common 字段和 ts 字段,获取它们的值,ts 封装为包装类 Long,其余两个字段的值封装为 JSONObject;

④判断是否有 displays 字段,如果有,将其值封装为 JSONArray,遍历该数组,依次获取每个元素(记为 display),封装为JSONObject。创建一个空的 JSONObject,将 display、common、page和 ts 添加到该对象中,获得处理好的曝光数据,发送到曝光侧输出流。动作日志的处理与曝光日志相同(注意:一条页面日志可能既有曝光数据又有动作数据,二者没有任何关系,因此曝光数据不为 null 时仍要对动作数据进行处理);

⑤动作日志和曝光日志处理结束后删除 displays 和 actions 字段,此时主流的 JSONObject 中只有 common 字段、 page 字段和 ts 字段,即为最终的页面日志。

处理结束后,页面日志数据位于主流其余四种日志分别位于对应的侧输出流,将五条流的数据写入 Kafka 对应主题即可。

1.3 图解

 1.4 代码

1)在 KafkaUtil 工具类中补充 getKafkaProducer() 方法 --- 生产者

 public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic){return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());}

2)创建 DateFormatUtil 工具类用于日期格式化

package com.atguigu.gmall.realtime.util;import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Date;public class DateFormatUtil {private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static Long toTs(String dtStr, boolean isFull) {LocalDateTime localDateTime = null;if (!isFull) {dtStr = dtStr + " 00:00:00";}localDateTime = LocalDateTime.parse(dtStr, dtfFull);return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();}public static Long toTs(String dtStr) {return toTs(dtStr, false);}public static String toDate(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf.format(localDateTime);}public static String toYmdHms(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtfFull.format(localDateTime);}public static void main(String[] args) {System.out.println(toYmdHms(System.currentTimeMillis()));}
}

 3)主程序

package com.atguigu.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.concurrent.TimeUnit;public class BaseLogApp {//数据源:web/app -> Nginx -> 日志服务器(.log) -> flume ->Kafka (ODS) -> FlinkApp -> Kafka(DWD)//程  序:Mock(lg.sh) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)public static void main(String[] args) throws Exception {//1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*//1.1 开启CheckPointenv.enableCheckpointing(5 *6000L , CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(10 *6000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));//1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop107:8020/211126/ck");System.setProperty("HADOOP_USER_NAME","atguigu");*///2.消费Kafka topic_log 主题的数据创建流String topic = "topic_log";String groupId = "base_log_app_211126";DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));//3.过滤掉非JSON格式的数据&将每行数据转换为JSON对象OutputTag<String> dirtyTag = new OutputTag<String>("dirty"){};SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {@Overridepublic void processElement(String value, ProcessFunction<String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);collector.collect(jsonObject);//主输出流} catch (Exception e) {context.output(dirtyTag, value);//非JSON数据输出到侧输出流中}}});//获取侧输出流脏数据并打印DataStream<String> dirtyDS = jsonObjDS.getSideOutput(dirtyTag);dirtyDS.print("Dirty>>>>>>>");//将脏数据输出到指定主题上String dirty_topic="dwd_traffic_dirty_log";dirtyDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(dirty_topic));//4.按照Mid分组KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));//5.使用状态编程做新老用户标记校验SingleOutputStreamOperator<JSONObject> jsonObjectWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {private ValueState<String> lastVisitState;@Overridepublic void open(Configuration parameters) throws Exception {lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));}@Overridepublic JSONObject map(JSONObject value) throws Exception {//获取is_new 标记 & ts 并将时间戳转换为年月日String isNew = value.getJSONObject("common").getString("is_new");Long ts = value.getLong("ts");String curDate = DateFormatUtil.toDate(ts);//获取状态中的日期String lastDate = lastVisitState.value();//判断is_new标记是否为“1”if ("1".equals(isNew)) {if (lastDate == null) {lastVisitState.update(curDate);} else if (!lastDate.equals(curDate)) {value.getJSONObject("common").put("is_new", "0");}} else if (lastDate == null) {lastVisitState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));//将时间改成昨天}return value;}});//6.使用测输出流进行分流处理,这里把页面当成主流,其他(启动、曝光、动作、错误)放到侧输出流中OutputTag<String> startTag = new OutputTag<String>("start"){};OutputTag<String> displayTag = new OutputTag<String>("display"){};OutputTag<String> actionTag = new OutputTag<String>("action"){};OutputTag<String> errorTag = new OutputTag<String>("error"){};SingleOutputStreamOperator<String> pageDS = jsonObjectWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {@Overridepublic void processElement(JSONObject value, ProcessFunction<JSONObject, String>.Context context, Collector<String> collector) throws Exception {//尝试获取错误信息String err = value.getString("err");if (err != null) {//将数据写到侧输出流context.output(errorTag, value.toJSONString());}//移除错误信息value.remove("err");//尝试获得启动信息String start = value.getString("start");if (start != null) {//将数据写到start侧输出流context.output(startTag, value.toJSONString());} else {//获取公共(common)信息 & 页面 & 时间戳(ts)String common = value.getString("common");String pageId = value.getJSONObject("page").getString("page_id");Long ts = value.getLong("ts");//尝试获取动作数据JSONArray actions = value.getJSONArray("actions");if (actions != null && actions.size() > 0) {//避免actions标签内容为空//遍历曝光数据 & 写到display侧输出流中for (int i = 0; i < actions.size(); i++) {JSONObject action = actions.getJSONObject(i);action.put("common", common);action.put("page_id", pageId);context.output(actionTag, action.toJSONString());}}//尝试获取曝光数据JSONArray displays = value.getJSONArray("displays");if (displays != null && displays.size() > 0) {//避免displays标签内容为空//遍历曝光数据 & 写到display侧输出流中for (int i = 0; i < displays.size(); i++) {JSONObject display = displays.getJSONObject(i);display.put("common", common);display.put("page_id", pageId);display.put("ts", ts);context.output(displayTag, display.toJSONString());}}//移除曝光和动作数据 & 写到页面日志主流value.remove("displays");value.remove("actions");collector.collect(value.toJSONString());}}});//7.提取各个侧输出流数据DataStream<String> startDS = pageDS.getSideOutput(startTag);DataStream<String> displayDS = pageDS.getSideOutput(displayTag);DataStream<String> actionDS = pageDS.getSideOutput(actionTag);DataStream<String> errorDS = pageDS.getSideOutput(errorTag);//8.将数据打印写入对应的主题pageDS.print("Page>>>>>>>>>");startDS.print("Start>>>>>>>");displayDS.print("Display>>>");actionDS.print("Action>>>>>");errorDS.print("Error>>>>>>>");String page_topic = "dwd_traffic_page_log";String start_topic = "dwd_traffic_start_log";String display_topic = "dwd_traffic_display_log";String action_topic = "dwd_traffic_action_log";String error_topic = "dwd_traffic_error_log";pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));//9.启动任务env.execute("BaseLogApp");}
}

二:流量域独立访客事务事实表

2.1 主要任务

过滤页面数据中的独立访客访问记录。

2.2 思路分析

1)过滤 last_page_id 不为 null 的数据

独立访客数据对应的页面必然是会话起始页面,last_page_id 必为 null。过滤 last_page_id != null 的数据,减小数据量,提升计算效率。

2)筛选独立访客记录

运用 Flink 状态编程,为每个 mid 维护一个键控状态,记录末次登录日期

如果末次登录日期为 null 或者不是今日,则本次访问是该 mid 当日首次访问,保留数据,将末次登录日期更新为当日。否则不是当日首次访问,丢弃数据。

3)状态存活时间设置

如果保留状态,第二日同一 mid 再次访问时会被判定为新访客,如果清空状态,判定结果相同,所以只要时钟进入第二日状态就可以清空。

设置状态的 TTL 1 天,更新模式为 OnCreateAndWrite,表示在创建和更新状态时重置状态存活时间。如:2022-02-21 08:00:00 首次访问,若 2022-02-22 没有访问记录,则 2022-02-22 08:00:00 之后状态清空。

 2.3 图解

 2.4 代码

1)主程序

package com.atguigu.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//数据源:web/app -> Nginx -> 日志服务器(.log) -> flume ->Kafka (ODS) -> FlinkApp -> Kafka(DWD) -> Flink(App) -> Kafka(DWD)
//程  序:Mock(lg.sh) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> DwdTrafficUniqueVisitorDetail ->Kafka(ZK)public class DwdTrafficUniqueVisitorDetail {//独立访客需求(uv)public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*//1.1 开启CheckPointenv.enableCheckpointing(5 *6000L , CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(10 *6000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));//1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop107:8020/211126/ck");System.setProperty("HADOOP_USER_NAME","atguigu");*///2.读取kafka页面日志主题创建流String topic= "dwd_traffic_page_log";// String topic= "topic_log";String groupId= "unique_visitor_detail_211126";DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));//3.过滤掉上一跳页面不为null的数据并将每行数据转换为JSON对象SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> collector) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);//获取上一跳页面IDString lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");if (lastPageId == null) {//收集当天首次登陆的数据collector.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("脏数据>>>"+value);//脏数据}}});//4.按照mid分组KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));//5.使用状态编程实现按照mid的去重SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {private ValueState<String> lastVisitState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("last-visit", String.class);//设置状态的TTLStateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();stateDescriptor.enableTimeToLive(ttlConfig);lastVisitState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {//获取状态数据&当前数据中的时间戳并转换为日期String lastDate = lastVisitState.value();Long ts = value.getLong("ts");String curDate = DateFormatUtil.toDate(ts);//如果最近一次访问app/web的日期不是今天 或者 没有最近一次访问的日期(新用户) ,则将最近一次的访问日期// 更新到今天,并存储到流中if (lastDate == null || !lastDate.equals(curDate)) {lastVisitState.update(curDate);return true;}return false;}});//6.将数据写入kafkaString targetTopic = "dwd_traffic_unique_visitor_detail";uvDS.print(">>>>>>>>>>");uvDS.map(JSONAware::toJSONString).addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));//7.执行env.execute();}
}


http://chatgpt.dhexx.cn/article/weUol8Io.shtml

相关文章

数据仓库之DWD层

DWD&#xff08;Data WareHouse Detail&#xff09;数据明细层&#xff0c;主要是将从业务数据库中同步过来的ODS层数据进行清洗和整合成相应的事实表。事实表作为数据仓库维度建模的核心&#xff0c;需要紧紧围绕着业务过程来设计。在拿到业务系统的表结构后&#xff0c;进行大…

数仓建设 | ODS、DWD、DWM等理论实战(好文收藏)

本文目录&#xff1a; 一、数据流向 二、应用示例 三、何为数仓DW 四、为何要分层 五、数据分层 六、数据集市 七、问题总结 导读 数仓在建设过程中&#xff0c;对数据的组织管理上&#xff0c;不仅要根据业务进行纵向的主题域划分&#xff0c;还需要横向的数仓分层规范。本文…

数仓及其维度(分层)建模(ODS DWD DWS DWT ADS)

一. 数仓及其维度 1. 什么是数仓&#xff1f; 数据仓库&#xff0c;简称数仓,&#xff08; Data Warehouse &#xff09;。从逻辑上理解&#xff0c;数据库和数仓没有区别&#xff0c;都是通过数据库软件实现存放数据的地方&#xff0c;只不过从数据量来说&#xff0c;数据仓库…

[数据仓库]分层概念,ODS,DM,DWD,DWS,DIM的概念

目录 前言&#xff1a; 一. 各种名词解释 1.1 ODS是什么&#xff1f; 1.2 数据仓库层DW&#xff1f; 1.2.1 DWD明细层? 1.2.2 DWM 轻度汇总层(MID或DWB, data warehouse basis) 1.2.3 DWS 主题层(DM&#xff0c;data market或DWS, data warehouse service) 1.3 APP&…

详解数据仓库和数据集市:ODS、DW、DWD、DWM、DWS、ADS

一、数据流向 二、应用示例 三、何为数仓DW Data warehouse&#xff08;可简写为DW或者DWH&#xff09;数据仓库&#xff0c;是在数据库已经大量存在的情况下&#xff0c;它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建设的目的&#xff0c;是为前端…

数据仓库分层DWD、DWB、DWS

DW &#xff1a;data warehouse 翻译成数据仓库 DW数据分层&#xff0c;由下到上为 DWD,DWB,DWS DWD&#xff1a;data warehouse detail 细节数据层&#xff0c;有的也称为 ODS层&#xff0c;是业务层与数据仓库的隔离层 DWB&#xff1a;data warehouse base 基础数据层&#x…

数据分层详解ODS、DWD、DWM、DWS、ADS

详解数仓中的数据分层&#xff1a;ODS、DWD、DWM、DWS、ADS 何为数仓DW Data warehouse&#xff08;可简写为DW或者DWH&#xff09;数据仓库&#xff0c;是在数据库已经大量存在的情况下&#xff0c;它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建…

简单搞定数仓搭建:数仓模型(DWD)

明细粒度事实层&#xff08;DWD&#xff09; 明细粒度事实层以业务过程驱动建模&#xff0c;基于每个具体的业务过程特点&#xff0c;构建最细粒度的明细层事实表。您可以结合企业的数据使用特点&#xff0c;将明细事实表的某些重要维度属性字段做适当冗余&#xff0c;即宽表化…

数据仓库和数据集市详解:ODS、DW、DWD、DWM、DWS、ADS

数据流向 应用示例 何为数仓DW Data warehouse&#xff08;可简写为DW或者DWH&#xff09;数据仓库&#xff0c;是在数据库已经大量存在的情况下&#xff0c;它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建设的目的&#xff0c;是为前端查询和分析…

六、Sails中执行存储过程模拟Waterline的Create插入数据

文章目录 创建 baseCreate 存储过程参数设置Prepared StatementsLAST_INSERT_ID和IDENTITY 模拟WaterlinesendNativeQuery规划密钥处理转换字段名称和字段值返回数据处理修改控制器代码datetime bugmysql库中对数据库字段类型定义customToJSON postman自动化测试 清楚Waterline…

oracle cdr是什么,CDRD TALK|全栈架构Sails.js简介

原标题&#xff1a;CDRD TALK|全栈架构Sails.js简介 Sails.js是一个可伸缩的、数据驱动的、面向服务的现代App架构。它致力于构建基于Node.js服务的定制化企业级应用。在Sails.js之前&#xff0c;构建一个实用的产品级Node.js应用的时间成本通常以月为单位计算。但是使用Sails.…

node-sails后台搭建

这个就直接简单搭建最基本的后台了 一、安装 安装sails npm i sails sails -v //检测版本 创建空项目 sails new my-app 安装数据库 cd my-app npm install sails-mysql -save 二、文件配置 Datastores.js 里面的数据库配置url Local.js里面port :1448端口 服务启动的端口 …

三、以user表为例,用Amis+Sails实现增删改查操作

文章目录 CRUD 组件查查询api分页fetcher参数观察统一处理method分页参数提交到后端自定义分页和页面大小&#xff08;pageSize&#xff09; 搜索排序头部工具条列折叠按钮刷新和导出excel自定义内容 删单条删除批量删除 增新增数据headerToolbar 结果分析前端数据格式要求 改数…

三、Sails 中使用Jwt进行身份认证

文章目录 Jwt 概述为什么要用JwtJwt原理 Jwt认证安装 Jwt 库登录ApiVerify Signature过期时间Nodejs 单线程易崩问题 验证程序修改配置积极策略消极策略多重验证 Jwt 测试正常登录过期或错误密钥测试 Jwt 概述 由于我们是完全前后端分离的开发模式&#xff0c;我们的后端对前端…

Sails基础之Controller层

通过前面的使用&#xff0c;我们可以看出Sails中MVC的结构更倾向于MVP的概念&#xff0c;Presenter在Sails中被称之为Actions&#xff1a; They often act as a middleman between your models and views. Controller层这个结构上的变化是Sails v1.0中新提出的方案&#xff0c;…

二、 在Sails中使用Typescript

文章目录 Typescript 基础Typescript 安装TypeScript 问题最简单的改造 Sails重定义Waterline&#xff08;Orm&#xff09; 重写ModelsTypescript 重写控制器User Model的进一步优化前后端约定 路径别名tsconfig.jsonmodule-alias安装使用 Jest测试 Typescript 基础 Typescrip…

四、Sails项目的Api文档——集成Swagger解决方案

文章目录 Api的迷思SwaggerSwagger概述在Sails中集成Swagger安装Swagger 生成设置生成的内容SwaggerUI Assets和.tmpBlueprint 蓝图Blueprint是什么Blueprint 配置local.js 进一步控制Swagger输出路由过滤路由的Swagger配置进一步优化Authorization Api的迷思 我们都知道写代码…

sails mysql_Sails+MVC+Mysql+Node+学习笔记一

项目构建 安装Node就不多说了&#xff0c; 1.sails安装与项目新建运行 npm install sails -g//全局安装 sails new project-name//新建项目 cd project-name //进入刚才新建项目的目录 sails lift //运行项目&#xff0c;运行原理也是直接在项目目录路径下使用node app.js npm …

Sails.js自动化Api实践与测试

开发中为了快速交互数据库&#xff0c;于是需要一个能便捷搭建api的平台。于是学习了一下sails.js框架。本次实践是一次摸索&#xff0c;使用了winston日志记录&#xff0c;supertest单元测试&#xff0c;mongo数据库&#xff0c;hashids哈希值解密。 模块: winstonsupertestmo…

五、解读Sails之Waterline源代码

文章目录 sql调试代码跟踪package.json启动调试Auto-Migrating备份原始数据删除所有表再重建回写备份数据 加密库 encrypted-attraes-256-gcm算法encrypted-att 的使用密钥 sql转义 sqlstring日期处理三种方式比较mariaDB&#xff08;或my-sql&#xff09;中的日期时间string 对…