Structured Streaming 快速入门系列(三)Structured Streaming 实战之 Sink

article/2025/8/27 22:05:01

文章目录

  • Sink
    • HDFS Sink
    • Kafka Sink
    • Foreach Writer
    • 自定义 Sink
    • Tigger
    • 从 Source 到 Sink 的流程
    • 错误恢复和容错语义

Sink

目标和步骤

目标

  • 能够串联两端, 理解整个流式应用, 以及其中的一些根本的原理, 比如说容错语义
  • 能够知道如何对接外部系统, 写入数据

步骤

  • HDFS Sink
  • Kafka Sink
  • Foreach Sink
  • 自定义 Sink
  • Tiggers
  • Sink 原理
  • 错误恢复和容错语义

HDFS Sink

目标和步骤

目标

能够使用 Spark 将流式数据的处理结果放入 HDFS

步骤

  • 场景和需求
  • 代码实现

场景和需求

场景

  • Kafka 往往作为数据系统和业务系统之间的桥梁
  • 数据系统一般由批量处理和流式处理两个部分组成
  • 在 Kafka 作为整个数据平台入口的场景下, 需要使用 StructuredStreaming 接收 Kafka 的数据并放置于 HDFS 上, 后续才可以进行批量处理

在这里插入图片描述
案例需求

从 Kafka 接收数据, 从给定的数据集中, 裁剪部分列, 落地于 HDFS

代码实现
步骤说明

  • 从 Kafka 读取数据, 生成源数据集
  • 连接 Kafka 生成 DataFrame
  • 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
  • 对源数据集选择列
  • 解析 CSV 格式的数据
  • 生成正确类型的结果集
  • 落地 HDFS

整体代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")result.writeStream.format("parquet") // 也可以是 "orc", "json", "csv" 等.option("path", "/dataset/streaming/result/").start()

Kafka Sink

目标和步骤

目标

掌握什么时候要将流式数据落地至 Kafka, 以及如何落地至 Kafka

步骤

  • 场景
  • 代码

场景

  • 有很多时候, ETL 过后的数据, 需要再次放入 Kafka
  • 在 Kafka 后, 可能会有流式程序统一将数据落地到 HDFS 或者 HBase

在这里插入图片描述
案例需求

从 Kafka 中获取数据, 简单处理, 再次放入 Kafka

代码

步骤

  • 从 Kafka 读取数据, 生成源数据集
  • 连接 Kafka 生成 DataFrame
  • 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
  • 对源数据集选择列
  • 解析 CSV 格式的数据
  • 生成正确类型的结果集
  • 再次落地 Kafka

代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")result.writeStream.format("kafka").outputMode(OutputMode.Append()).option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("topic", "streaming-bank-result").start().awaitTermination()

Foreach Writer

目标和步骤

目标

掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink, 同时能够将数据落地到 MySQL

步骤

  • 需求
  • 代码

需求

场景

大数据有一个常见的应用场景

  • 收集业务系统数据
  • 数据处理
  • 放入 OLTP 数据
  • 外部通过 ECharts 获取并处理数据

这个场景下, StructuredStreaming 就需要处理数据并放入 MySQL 或者 MongoDB, HBase 中以供 Web 程序可以获取数据, 图表的形式展示在前端

在这里插入图片描述

Foreach 模式::

起因

  • 在 Structured Streaming 中, 并未提供完整的 MySQL/JDBC 整合工具
  • 不止 MySQL 和 JDBC, 可能会有其它的目标端需要写入
  • 很多时候 Structured Streaming 需要对接一些第三方的系统, 例如阿里云的云存储, 亚马逊云的云存储等, 但是 Spark 无法对所有第三方都提供支持, 有时候需要自己编写

解决方案

在这里插入图片描述

  • 既然无法满足所有的整合需求, StructuredStreaming 提供了 Foreach, 可以拿到每一个批次的数据
  • 通过 Foreach 拿到数据后, 可以通过自定义写入方式, 从而将数据落地到其它的系统

案例需求::

在这里插入图片描述
从 Kafka 中获取数据, 处理后放入 MySQL

步骤

  • 创建 DataFrame 表示 Kafka 数据源
  • 在源 DataFrame 中选择三列数据
  • 创建 ForeachWriter 接收每一个批次的数据落地 MySQL
  • Foreach 落地数据

代码


import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")class MySQLWriter extends ForeachWriter[Row] {val driver = "com.mysql.jdbc.Driver"var statement: Statement = _var connection: Connection  = _val url: String = "jdbc:mysql://Bigdata01:3306/streaming-bank-result"val user: String = "root"val pwd: String = "000000"override def open(partitionId: Long, version: Long): Boolean = {Class.forName(driver)connection = DriverManager.getConnection(url, user, pwd)this.statement = connection.createStatementtrue}override def process(value: Row): Unit = {statement.executeUpdate(s"insert into bank values(" +s"${value.getAs[Int]("age")}, " +s"${value.getAs[Int]("job")}, " +s"${value.getAs[Int]("balance")} )")}override def close(errorOrNull: Throwable): Unit = {connection.close()}
}result.writeStream.foreach(new MySQLWriter).start().awaitTermination()

自定义 Sink

目标和步骤

目标

  • Foreach 倾向于一次处理一条数据, 如果想拿到 DataFrame 幂等的插入外部数据源, 则需要自定义 Sink
  • 了解如何自定义 Sink

步骤

  • Spark 加载 Sink 流程分析
  • 自定义 Sink

Spark 加载 Sink 流程分析
Sink 加载流程

writeStream 方法中会创建一个 DataStreamWriter 对象

def writeStream: DataStreamWriter[T] = {if (!isStreaming) {logicalPlan.failAnalysis("'writeStream' can be called only on streaming Dataset/DataFrame")}new DataStreamWriter[T](this)
}

在 DataStreamWriter 对象上通过 format 方法指定 Sink 的短名并记录下来

def format(source: String): DataStreamWriter[T] = {this.source = sourcethis
}

最终会通过 DataStreamWriter 对象上的 start 方法启动执行, 其中会通过短名创建 DataSource

val dataSource =DataSource(df.sparkSession,className = source,  //传入的 Sink 短名 options = extraOptions.toMap,partitionColumns = normalizedParCols.getOrElse(Nil))

在创建 DataSource 的时候, 会通过一个复杂的流程创建出对应的 Source 和 Sink

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)

在这个复杂的创建流程中, 有一行最关键的代码, 就是通过 Java 的类加载器加载所有的 DataSourceRegister

val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

在 DataSourceRegister 中会创建对应的 Source 或者 Sink

trait DataSourceRegister {def shortName(): String       //	提供短名
}trait StreamSourceProvider {def createSource(          //创建 SourcesqlContext: SQLContext,metadataPath: String,schema: Option[StructType],providerName: String,parameters: Map[String, String]): Source
}trait StreamSinkProvider {def createSink( //创建 Sink	             sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): Sink
}

自定义 Sink 的方式

根据前面的流程说明, 有两点非常重要

  • Spark 会自动加载所有 DataSourceRegister 的子类, 所以需要通过 DataSourceRegister 加载 Source 和 Sink
  • Spark 提供了 StreamSinkProvider 用以创建 Sink, 提供必要的依赖

所以如果要创建自定义的 Sink, 需要做两件事

  • 创建一个注册器, 继承 DataSourceRegister 提供注册功能, 继承 StreamSinkProvider 获取创建 Sink 的必备依赖
  • 创建一个 Sink 子类

自定义 Sink

步骤

  • 读取 Kafka 数据
  • 简单处理数据
  • 创建 Sink
  • 创建 Sink 注册器
  • 使用自定义 Sink

代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}.as[(Int, Int, Int)].toDF("age", "job", "balance")class MySQLSink(options: Map[String, String], outputMode: OutputMode) extends Sink {override def addBatch(batchId: Long, data: DataFrame): Unit = {val userName = options.get("userName").orNullval password = options.get("password").orNullval table = options.get("table").orNullval jdbcUrl = options.get("jdbcUrl").orNullval properties = new Propertiesproperties.setProperty("user", userName)properties.setProperty("password", password)data.write.mode(outputMode.toString).jdbc(jdbcUrl, table, properties)}
}class MySQLStreamSinkProvider extends StreamSinkProvider with DataSourceRegister {override def createSink(sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): Sink = {new MySQLSink(parameters, outputMode)}override def shortName(): String = "mysql"
}result.writeStream.format("mysql").option("username", "root").option("password", "000000").option("table", "streaming-bank-result").option("jdbcUrl", "jdbc:mysql://Bigdata01:3306/test").start().awaitTermination()

Tigger

目标和步骤
目标

掌握如何控制 StructuredStreaming 的处理时间

步骤

  • 微批次处理
  • 连续流处理

微批次处理

什么是微批次

在这里插入图片描述
并不是真正的流, 而是缓存一个批次周期的数据, 后处理这一批次的数据

通用流程

步骤

  • 根据 Spark 提供的调试用的数据源 Rate 创建流式 DataFrame
  • Rate 数据源会定期提供一个由两列 timestamp, value 组成的数据, value 是一个随机数

处理和聚合数据, 计算每个个位数和十位数各有多少条数据

  • 对 value 求 log10 即可得出其位数
  • 后按照位数进行分组, 最终就可以看到每个位数的数据有多少个

代码

val spark = SparkSession.builder().master("local[6]").appName("socket_processor").getOrCreate()import org.apache.spark.sql.functions._
import spark.implicits._spark.sparkContext.setLogLevel("ERROR")val source = spark.readStream.format("rate").load()val result = source.select(log10('value) cast IntegerType as 'key, 'value).groupBy('key).agg(count('key) as 'count).select('key, 'count).where('key.isNotNull).sort('key.asc)

默认方式划分批次

介绍

默认情况下的 Structured Streaming 程序会运行在微批次的模式下, 当一个批次结束后, 下一个批次会立即开始处理

步骤

指定落地到 Console 中, 不指定 Trigger

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()

按照固定时间间隔划分批次

介绍

使用微批次处理数据, 使用用户指定的时间间隔启动批次, 如果间隔指定为 0, 则尽可能快的去处理, 一个批次紧接着一个批次

  • 如果前一批数据提前完成, 待到批次间隔达成的时候再启动下一个批次
  • 如果前一批数据延后完成, 下一个批次会在前面批次结束后立即启动
  • 如果没有数据可用, 则不启动处理

步骤

通过 Trigger.ProcessingTime() 指定处理间隔

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.ProcessingTime("2 seconds")).start().awaitTermination()

一次性划分批次

介绍

只划分一个批次, 处理完成以后就停止 Spark 工作, 当需要启动一下 Spark 处理遗留任务的时候, 处理完就关闭集群的情况下, 这个划分方式非常实用

步骤

使用 Trigger.Once 一次性划分批次

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.Once()).start().awaitTermination()

连续流处理

介绍

  • 微批次会将收到的数据按照批次划分为不同的 DataFrame, 后执行 DataFrame, 所以其数据的处理延迟取决于每个 DataFrame 的处理速度, 最快也只能在一个 DataFrame 结束后立刻执行下一个, 最快可以达到 100ms 左右的端到端延迟
  • 而连续流处理可以做到大约 1ms 的端到端数据处理延迟
  • 连续流处理可以达到 at-least-once 的容错语义
  • 从 Spark 2.3 版本开始支持连续流处理, 我们所采用的 2.2 版本还没有这个特性, 并且这个特性截止到 2.4 依然是实验性质, 不建议在生产环境中使用

操作

步骤

使用特殊的 Trigger 完成功能

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.Continuous("1 second")).start().awaitTermination()

限制

  • 只支持 Map 类的有类型操作
  • 只支持普通的的 SQL 类操作, 不支持聚合
  • Source 只支持 Kafka
  • Sink 只支持 Kafka, Console, Memory

从 Source 到 Sink 的流程

目标和步骤

目标

理解 Source 到 Sink 的整体原理

步骤

从 Source 到 Sink 的流程

从 Source 到 Sink 的流程

在这里插入图片描述

  • 在每个 StreamExecution 的批次最开始, StreamExecution 会向 Source 询问当前 Source 的最新进度, 即最新的 offset
  • StreamExecution 将 Offset 放到 WAL 里
  • StreamExecution 从 Source 获取 start offset, end offset 区间内的数据
  • StreamExecution 触发计算逻辑 logicalPlan 的优化与编译
  • 计算结果写出给 Sink
    • 调用 Sink.addBatch(batchId: Long, data: DataFrame) 完成
    • 此时才会由 Sink 的写入操作开始触发实际的数据获取和计算过程
  • 在数据完整写出到 Sink 后, StreamExecution 通知 Source 批次 id 写入到 batchCommitLog, 当前批次结束

错误恢复和容错语义

目标和步骤

目标

理解 Structured Streaming 中提供的系统级别容错手段

步骤

  • 端到端
  • 三种容错语义
  • Sink 的容错

端到端

在这里插入图片描述

  • Source 可能是 Kafka, HDFS
  • Sink 也可能是 Kafka, HDFS, MySQL 等存储服务
  • 消息从 Source 取出, 经过 Structured Streaming 处理, 最后落地到 Sink 的过程, 叫做端到端

三种容错语义

at-most-once

在这里插入图片描述

  • 在数据从 Source 到 Sink 的过程中, 出错了, Sink 可能没收到数据, 但是不会收到两次, 叫做 at-most-once
  • 一般错误恢复的时候, 不重复计算, 则是 at-most-once

at-least-once

在这里插入图片描述

  • 在数据从 Source 到 Sink 的过程中, 出错了, Sink 一定会收到数据, 但是可能收到两次, 叫做 at-least-once
  • 一般错误恢复的时候, 重复计算可能完成也可能未完成的计算, 则是 at-least-once

exactly-once

在这里插入图片描述

  • 在数据从 Source 到 Sink 的过程中, 虽然出错了, Sink 一定恰好收到应该收到的数据, 一条不重复也一条都不少, 即是 exactly-once
  • 想做到 exactly-once 是非常困难的

Sink 的容错

在这里插入图片描述
故障恢复一般分为 Driver 的容错和 Task 的容错

  • Driver 的容错指的是整个系统都挂掉了
  • Task 的容错指的是一个任务没运行明白, 重新运行一次

因为 Spark 的 Executor 能够非常好的处理 Task 的容错, 所以我们主要讨论 Driver 的容错, 如果出错的时候

  • 读取 WAL offsetlog 恢复出最新的 offsets
    当 StreamExecution 找到 Source 获取数据的时候, 会将数据的起始放在 WAL offsetlog 中, 当出错要恢复的时候, 就可以从中获取当前处理批次的数据起始, 例如 Kafka 的 Offset

  • 读取 batchCommitLog 决定是否需要重做最近一个批次
    当 Sink 处理完批次的数据写入时, 会将当前的批次 ID 存入 batchCommitLog, 当出错的时候就可以从中取出进行到哪一个批次了, 和 WAL 对比即可得知当前批次是否处理完

  • 如果有必要的话, 当前批次数据重做

    • 如果上次执行在 (5) 结束前即失效, 那么本次执行里 Sink 应该完整写出计算结果
    • 如果上次执行在 (5) 结束后才失效, 那么本次执行里 Sink 可以重新写出计算结果 (覆盖上次结果), 也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)

这样即可保证每次执行的计算结果, 在 Sink 这个层面, 是 不重不丢 的, 即使中间发生过失效和恢复, 所以 Structured Streaming 可以做到 exactly-once

容错所需要的存储

存储

  • offsetlog 和 batchCommitLog 关乎于错误恢复
  • offsetlog 和 batchCommitLog 需要存储在可靠的空间里
  • offsetlog 和 batchCommitLog 存储在 Checkpoint 中
  • WAL 其实也存在于 Checkpoint 中

指定 Checkpoint

只有指定了 Checkpoint 路径的时候, 对应的容错功能才可以开启

aggDF.writeStream.outputMode("complete").option("checkpointLocation", "path/to/HDFS/dir") //指定 Checkpoint 的路径, 这个路径对应的目录必须是 HDFS 兼容的文件系统.format("memory").start()

需要的外部支持

如果要做到 exactly-once, 只是 Structured Streaming 能做到还不行, 还需要 Source 和 Sink 系统的支持

  • Source 需要支持数据重放

当有必要的时候, Structured Streaming 需要根据 start 和 end offset 从 Source 系统中再次获取数据, 这叫做重放

  • Sink 需要支持幂等写入

如果需要重做整个批次的时候, Sink 要支持给定的 ID 写入数据, 这叫幂等写入, 一个 ID 对应一条数据进行写入, 如果前面已经写入, 则替换或者丢弃, 不能重复

所以 Structured Streaming 想要做到 exactly-once, 则也需要外部系统的支持, 如下

Source

Sources是否可重放原生内置支持注解
HDFS可以已支持包括但不限于 Text, JSON, CSV, Parquet, ORC
Kafka可以已支持Kafka 0.10.0+
RateStream可以已支持以一定速率产生数据
RDBMS可以待支持预计后续很快会支持
Socket不可以已支持主要用途是在技术会议和讲座上做 Demo

Sink

Sinks是否幂等写入原生内置支持注解
HDFS可以支持包括但不限于 Text, JSON, CSV, Parquet, ORC
ForeachSink可以支持可定制度非常高的 Sink, 是否可以幂等取决于具体的实现
RDBMS可以待支持预计后续很快会支持
Kafka不可以支持Kafka 目前不支持幂等写入, 所以可能会有重复写入

http://chatgpt.dhexx.cn/article/41tflTXs.shtml

相关文章

STM32基础入门学习笔记:内部高级功能应用

文章目录: 一:低功耗模式 1.睡眠模式测试程序 NVIC.h NVIC.c key.h key.c main.c 2.停机模式测试程序 main.c 3.待机模式测试程序 main.c 二:看门狗 1.独立看门狗测试程序 iwdg.h iwdg.c main.c 2.窗口看门狗测试程序 wwdg…

STM32初学者入门FreeRTOS操作系统,多任务实时系统

1.详细介绍FreeRTOS操作系统,然后举了几个实例 FreeRTOS(Free Real-Time Operating System)是一个开源的嵌入式实时操作系统,它专门设计用于在资源有限的嵌入式系统中运行。FreeRTOS提供了一些用于任务管理、调度、同步和通信的功…

【STM32系列汇总】博主的STM32实战快速进阶之路(持续更新)

我把之前在学习和工作中使用STM32进行嵌入式开发的经验和教程等相关整理到这里,方便查阅学习,如果能帮助到您,请帮忙点个赞; 本文的宗旨 STM32 只是一个硬件平台,同样地他可以换成MSP430,NXP的RT等等&…

<STC32G12K128入门第四节>USAT串口(实战使用ESP32进行TCP通信)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、串口是什么?二、UART寄存器1.引脚配置寄存器2.SCON寄存器3.SBUF数据发送接收缓冲区4.辅助寄存器5.串口重定向三、案例总结前言 一、串口是什么? 这个我不想多说就是一种通讯方式,想具体…

STM32入门(二)

STM32入门(二) 一、新建工程 二、库开发与寄存器开发的关系 固件库就是函数的集合,固件库函数的作用是向下负责与寄存器直接打交道,向上提供用户函数调用的接口(API)。在51的开发中我们常常的作法是直接操…

STM32 快速入门(内核架构,启动方式,开发参考资料,芯片选型)

文章目录 1、启动方式(Start up)2、开发参考资料2.1 STM32 中文参考手册 3、通常的芯片选型步骤4、存储器和总线构架4.1 系统构架4.1.1 ICode 总线4.1.2 DCode 总线4.1.3 系统总线4.1.4 DMA 总线4.1.5 总线矩阵4.1.6 AHB/APB 桥(APB) 4.2 存储器组织&…

STM32—串口使用及配置(入门详解)

目录 一、常用函数 二、状态标记变量 三、串口接收中断流程 四、串口工具 五、配置1(非中断) 六、main.c代码(非中断) 七、配置2(中断) 八、main.c代码(中断) 一、常用函数 …

stm32学习(入门2)

stm32学习心得 提示:写博客纯属第一次练手,不足之处希望多提建议 文章目录 stm32学习心得前言一、什么是串口二、观察现象在我们波特率不同的情况下我们会发现乱码ch340短接波特率不同情况下波特率相同情况下 stm32代码总结 前言 我们前面写了stm32对le…

【STM32Cube】学习笔记(六):DHT11温湿度传感器

文章目录 摘要一、简介1.DHT11数字温湿度传感器2.DHT11性能参数2.DHT11数据结构2.DHT11传输时序 二、硬件电路设计1.模块内部电路2.与单片机相连接电路 三、软件设计1.CubeMX配置2.CubeIDE代码 四、结果显示五、总结附录 摘要 本篇文章用STM32CubeMX和STM32CubeIDE软件编程&am…

STM32入门篇之DHT11温湿度传感器

目录 前言一、项目介绍1.1 项目名称1.2 项目设计思路二、硬件准备2.1 STM32F407ZGT6三、软件准备3.1 Keil53.2 STM32f407固件库3.3 STM32CudeMx3.4 STM32CudeMx的f407软件包四、项目实施4.1 硬件平台开发4.1.1 keil5安装4.1.2 STM32CudeMx安装4.1.3 BSP工程项目创建4.1.4 BSP工…

STM32单片机入门教程---STM32简介

文章目录 一.简介二.片上资源(外设)三.命名规则四.系统结构五.引脚定义六.启动配置七.最小系统电路 一.简介 STM32是ST公司基于ARM Cortex-M内核开发的32位微控制器STM32家族系列 本次教程使用的是STM32F1系列(主流系列)ARM ARM…

关于新手学习STM32开发应该如何入门?

对于新手来说,学习STM32开发可能会感到困惑,尤其是在拿到开发板后该如何入门。在这里有嵌入式学习路线,毕设,各种项目,需要留个6。以下是部分内容概述:硬件介绍:了解STM32开发板的基本硬件组成和…

STM32_基础入门(九)_串口通讯详解

持续关注阿杰在线更新保姆式笔记~~坚持日更 参考资料: 《STM32F1开发指南-库函数版本》- 5.3 usart串口文件夹介绍 -第9章 串口实验 《STM32中文参考手册V10》-第25章通用同步异步收发器(USART) 片上外设GPIO配置 --《STM32中文参考手册V10》-8.1.11 外…

STM32cubemx教程及STM32入门(四)串口通信

STM32cubemx教程及STM32入门(四)串口通信 2022.8.24 前言 本章主要介绍串口通信的概念以及在STM32单片机上通过STM32CubeMX和HAL库进行串口通信,同时重定义了printf函数。 简介 在串行通信中,一个字符一个字符地传输&#xff…

STM32系列--从入门到精通

使用STM32也有好几个年头,起初用的开发板已经积灰了,刷干净开发板上的灰,我觉得,应该写点什么东西了,把这些年来调试走过的路,一些心得,一些体会,记录下来,希望对大家有所…

STM32 从入门到精通系列讲解 - 总目录

👦 作者介绍:Bazinga bingo,专注C语言应用硬核干货分享,潜心修炼,虚心学习,立志做嵌入式相关赛道的Top。 📕 本文收录于《STM32开发》专栏,包含STM32内部模块介绍、片内资源开发、不…

STM32入门教程第二讲

系列文章目录 STM32入门教程第二讲------介绍GPIO 目录 系列文章目录 前言 一、GPIO是什么? 1.简要介绍GPIO 2.GPIO基本结构 二.GPIO的模式介绍 (四种输入四种输出) 1.浮空输入_IN_FLOATING: ​编辑 2. 模拟输入_AIN ​编辑 3带上拉输入_IPU 4 .带下拉输…

【STM32】串口通信基本原理(超基础、详细版)

STM32F1xx官方资料: 《STM32中文参考手册V10》-第25章通用同步异步收发器(USART) 通信接口背景知识 设备之间通信的方式 一般情况下,设备之间的通信方式可以分成并行通信和串行通信两种。它们的区别是: 并、串行通信的区别 并行…

【STM32】HAL库 STM32CubeMX教程四---UART串口通信详解

前言: 今天我们学习STM32CubeMX串口的操作,以及HAL库串口的配置,我们会详细的讲解各个模块的使用和具体功能,并且基于HAL库实现Printf函数功能重定向,UART中断接收,本系列教程将HAL库与STM32CubeMX结合在…

横屏切换视频(iOS基于ZFPlayer 做的横屏上下滑动切换视频的需求)

思路:全屏到某一个视频时把这个视频的上一个和下一个视频一起传到全屏播放页,滑动时通过改变播放器的位置,切换视频(说得不清楚,不多说上代码,不理解或者需要详细思路下方留言) 1,手…