Kubeedge Beehive 模块源码分析

article/2025/9/12 22:33:35

文章目录

  • 概述
  • 结构
  • Model --- 消息模型
    • Header --- 消息头
    • Router --- 消息路由
      • 资源操作
      • 资源类型
  • Context --- 上下文
    • ModuleContext --- 模块上下文
    • MessageContext --- 消息上下文
    • GlobalContext --- 全局上下文
      • 方法
  • Channel Context
    • 数据结构
    • 方法
      • ModuleContext 接口实现
        • AddModule
        • AddModuleGroup
        • Cleanup
      • MessageContext 接口实现
        • Send
        • Receive
        • SendSync
        • SendResp
        • SendToGroup
        • SendToGroupSync
  • Socket Context
    • 主体
      • 数据结构
    • broker
      • 数据结构
      • 方法
        • Connect
        • Send
        • Receive
        • SendSyncInternal
    • Store
      • 数据结构
    • Wapper
      • Packer
        • Read
        • Write
  • Module
    • StartModules
    • GracefulShutdown

概述

Beehive 模块是 kubeedge 的核心模块,它负责管理所有模块的启动与停止,同时也负责多模块间的通信,它当前主要由: model, context, socket, channel 四个部分组成,其中:

  1. model 部分定义了消息的模型,这个消息模型是各个组件间通信所必须符合的规范。
  2. context 部分定义了消息的上下文以及模块上下文两个接口,同时使用了一个全局上下文来管理各个类型的上下文。
  3. socket 部分则实现了socket 类型的上下文通信,主要用于非本地通信。
  4. channel 部分则实现了channel 类型的上下文通信,主要用于本地通信。

结构

beehive

Model — 消息模型

在 model 中定义了消息的模型, 其主要结构如下:

type Message struct {  Header  MessageHeader `json:"header"`  Router  MessageRoute  `json:"route,omitempty"`  Content interface{}   `json:"content"`  
}

Header 为消息头,Router 为消息路由,Content 为消息本体。

Header — 消息头

消息头中主要定义了一些消息头部的详细信息,其结构如下:

type MessageHeader struct {  //消息的ID,使用UUID生成。 ID string `json:"msg_id"`  //消息的父ID,一般在响应消息时候填充,其一般要与请求消息的ID相同ParentID string `json:"parent_msg_id,omitempty"`  //消息的创建时间Timestamp int64 `json:"timestamp"`  //消息的特定资源版本,目前保存的是 k8s 资源的版本。//kubeedge利用消息资源版本的概念来实现可靠传输。ResourceVersion string `json:"resourceversion,omitempty"`  //发送同步的标志位,该标志将在 sendsync 中设置。Sync bool `json:"sync,omitempty"`  //船渡消息的类型,一般为 channel,unixsocket 等类型,如果为空,则默认是 channel 类型MessageType string `json:"type,omitempty"`  
}

Router — 消息路由

消息路由中定义了消息的一些操作和目的地等信息,其结构如下:

type MessageRoute struct {  //消息的来源Source string `json:"source,omitempty"`  //消息的目的地Destination string `json:"destination,omitempty"`  //消息广播的时候需要广播到哪个组Group string `json:"group,omitempty"`  //如何去操作资源Operation string `json:"operation,omitempty"`  //想要操作的资源类型是什么Resource string `json:"resource,omitempty"`  
}

资源操作

描述了可以对资源进行哪些操作:

const (InsertOperation        = "insert"  DeleteOperation        = "delete"  QueryOperation         = "query"  UpdateOperation        = "update"  PatchOperation         = "patch"  UploadOperation        = "upload"  ResponseOperation      = "response"  ResponseErrorOperation = "error"
)

资源类型

描述了由哪些资源类型:

const (ResourceTypePod                 = "pod"  ResourceTypeConfigmap           = "configmap"  ResourceTypeServiceAccountToken = "serviceaccounttoken"  ResourceTypeSecret              = "secret"  ResourceTypeNode                = "node"  ResourceTypePodlist             = "podlist"  ResourceTypePodStatus           = "podstatus"  ResourceTypePodPatch            = "podpatch"  ResourceTypeNodeStatus          = "nodestatus"  ResourceTypeNodePatch           = "nodepatch"  ResourceTypeRule                = "rule"  ResourceTypeRuleEndpoint        = "ruleendpoint"  ResourceTypeRuleStatus          = "rulestatus"  ResourceTypeLease               = "lease"  
)

Context — 上下文

ModuleContext — 模块上下文

ModuleContext 接口定义了如何将 module 加入到当前 context, 并将其分组,以及,结束时如何清理模块的接口:

type ModuleContext interface {  AddModule(info *common.ModuleInfo)  AddModuleGroup(module, group string)  Cleanup(module string)  
}

MessageContext — 消息上下文

MessageContext 接口定义了上下文如何为各个模块发送,接收,同步以及广播消息:

type MessageContext interface {  // async mode  Send(module string, message model.Message)  Receive(module string) (model.Message, error)  // sync mode  SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error)  SendResp(message model.Message)  // group broadcast  SendToGroup(group string, message model.Message)  SendToGroupSync(group string, message model.Message, timeout time.Duration) error  
}

当前这个两个接口的实现,在 kubeedge 中,主要是由 socket 部分和 channel 部分对其进行了实现,分别用于远程模块通信与本地模块通信。

GlobalContext — 全局上下文

GlobalContext 主要用来管理 module , message 与 Context 间的关系,以及提供一些方法,来便捷的操作 context, 其主要结构如下:

type GlobalContext struct {// 存储 context 类型与 ModuleContext 接口间关系// key 为 context 类型,value 为对应的 ModuleContext 接口moduleContext  map[string]ModuleContext// 存储 context 类型与 MessageContext 接口间关系  // key 为 context 类型,value 为对应的 MessageContext 接口messageContext map[string]MessageContext  // 存储 module 与 context 类型间的关系// key 为 module 名称,value 为对应的 context 类型moduleContextType map[string]string  // 存储 group 与 context 类型间的关系// key 为 group 名称,value 为对应的 context 类型 groupContextType map[string]string  ctx     gocontext.Context  cancel  gocontext.CancelFunc  ctxLock sync.RWMutex  
}

方法

// 根据传入的 contextTypes 初始化 context
InitContext(contextTypes []string)
// 获取 context
GetContext() gocontext.Context
// 结束
Done() <-chan struct{}
// 取消
Cancel()
// 添加 module
AddModule(module *common.ModuleInfo)
// 添加 module group
AddModuleGroup(module, group string)
// 清理 module
Cleanup(module string)
// 发送消息到模块
Send(module string, message model.Message)
// 接收模块的消息
Receive(module string) (model.Message, error)
// 发送同步消息
SendSync(module string,message model.Message, timeout time.Duration)(model.Message, error)
// 发送响应消息
SendResp(resp model.Message)
// 发送广播消息
SendToGroup(group string, message model.Message)
// 发送同步广播消息
SendToGroupSync(group string, message model.Message, timeout time.Duration) error

Channel Context

数据结构

通信类型的 Context 主要用于本地通信,即程序内部不同模块间的交互。其结构如下:

type Context struct {// 存储 module 与 channel 的关系// key 为模块名称, value 为 对应的 channel// 默认channel 缓冲区大小为1024channels     map[string]chan model.Message  chsLock      sync.RWMutex// 存储 group , module 与 channel 的关系// 第一层 key 为 group// 第二层 key 为 module ,vaule 为 channeltypeChannels map[string]map[string]chan model.Message  typeChsLock  sync.RWMutex // 匿名通道map// 存储 message 与 channel 的关系// key 为 messageID ,value 为 channelanonChannels map[string]chan model.Message  anonChsLock  sync.RWMutex  
}

方法

ModuleContext 接口实现

AddModule

  1. 创建一个类型为 model.Message 类型的,带缓冲区的 channel ,通道大小默认为1024。
  2. 将数据存入 channels map,key 为模块名称, value 为 对应的 channel

代码不是源码,为了方便展示整合过

func (ctx *Context) AddModule(info *common.ModuleInfo) {  channel := make(chan model.Message, ChannelSizeDefault) ctx.channels[module] = moduleCh  
}

AddModuleGroup

  1. 根据 module 名称从 channels map 中获取 channel
  2. 获取成功:
    1. 判断 typeChannels map 中是否存在对应的group, 不存在就初始化一个。
    2. 存在就将对应 module ,group,channel 存储起来
  3. 获取失败就输出警告

代码不是源码,为了方便展示整合过

func (ctx *Context) AddModuleGroup(module, group string) {if _,ok := ctx.channels[module]; ok {  if _, exist := ctx.typeChannels[group]; !exist {  ctx.typeChannels[group] = make(map[string]chan model.Message)  }  ctx.typeChannels[group][module] = ctx.typeChannels[group] return  }  klog.Warningf("Get bad module name %s when addmodulegroup", module)  
}

Cleanup

  1. 根据 module 从 channels map 获取 channel
  2. 获取成功:
    1. 从 channels map 删除数据
    2. 从 typeChannels map 删除数据
    3. sleep 20 Millisecond 以减少通道关闭的可能异常
    4. 关闭 channel

代码不是源码,为了方便展示整合过

func (ctx *Context) Cleanup(module string) {  if _,ok := ctx.channels[module]; ok {  delete(ctx.channels, module) for _, moduleMap := range ctx.typeChannels {  if _, exist := moduleMap[module]; exist {  delete(moduleMap, module)  break  }  } // decrease probable exception of channel closing  time.Sleep(20 * time.Millisecond)  close(channel)  }  
}

MessageContext 接口实现

Send

  1. 根据 module 获取 channel
  2. 往 channel 写 message

代码不是源码,为了方便展示整合过

func (ctx *Context) Send(module string, message model.Message) {  if channel := ctx.getChannel(module); channel != nil {  channel <- message  return  }  
}

Receive

  1. 根据 module 获取 channel
  2. 从 channel 读 message

代码不是源码,为了方便展示整合过

func (ctx *Context) Receive(module string) (model.Message, error) {  if channel := ctx.getChannel(module); channel != nil {  content := <-channel  return content, nil  }  return model.Message{}, fmt.Errorf("failed to get channel for module(%s)", module)  
}

SendSync

  1. 设置 deadline 即超时时间, 传入 timeout <= 0 时,默认为30s
  2. 将消息头 Sync 标志为设置为 true
  3. 根据 module 获取 channel : reqChannel
  4. 创建一个匿名 channel 存入 anonChannels map, key 为 messageID
  5. 创建一个defer 用来删除刚刚创建的匿名 channel
  6. 往 reqChannel 写 message
  7. 写超时就返回发送消息超时错误
  8. 未超时就等待匿名通道返回响应消息
  9. 响应超时就返回接收响应消息超时错误

代码不是源码,为了方便展示整合过

func (ctx *Context) SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) {  if timeout <= 0 {  timeout = MessageTimeoutDefault  }  deadline := time.Now().Add(timeout)  // make sure to set sync flag  message.Header.Sync = true  // check req/resp channel  reqChannel := ctx.getChannel(module)  if reqChannel == nil {  return model.Message{}, fmt.Errorf("bad request module name(%s)", module)  }  // new anonymous channel for response  anonChan := make(chan model.Message)  anonName := getAnonChannelName(message.GetID())  ctx.anonChsLock.Lock()  ctx.anonChannels[anonName] = anonChan  ctx.anonChsLock.Unlock()  defer func() {  ctx.anonChsLock.Lock()  delete(ctx.anonChannels, anonName)  close(anonChan)  ctx.anonChsLock.Unlock()  }()  select {  case reqChannel <- message:  case <-time.After(timeout):  return model.Message{}, fmt.Errorf("timeout to send message %s", message.GetID())  }  var resp model.Message  select {  case resp = <-anonChan:  case <-time.After(time.Until(deadline)):  return model.Message{}, fmt.Errorf("timeout to get response for message %s", message.GetID())  }  return resp, nil  
}

SendResp

  1. 根据 messageID 从 anonChannels map 获取 channel
  2. 往匿名 channel 写 message

代码不是源码,为了方便展示整合过

func (ctx *Context) SendResp(message model.Message) {  anonName := getAnonChannelName(message.GetParentID())  if channel, exist := ctx.anonChannels[anonName]; exist {  select {  case channel <- message:  default:}  return  } 
}

SendToGroup

  1. 根据 group 从 typeChannels map 获取当前 group 下的 channel map
  2. 遍历 channel map
  3. 往 channel 写 message

代码不是源码,为了方便展示整合过

func (ctx *Context) SendToGroup(moduleType string, message model.Message) {  send := func(module string, ch chan model.Message) {  select {  case ch <- message:  default:  ch <- message  }  }  if channelList := ctx.getTypeChannel(moduleType); channelList != nil {  for module, channel := range channelList {  go send(module, channel)  }  return  }
}

SendToGroupSync

  1. 设置 deadline 即超时时间, 传入 timeout <= 0 时,默认为30s
  2. 根据 group 从 typeChannels map 获取当前 group 下的 channel map
  3. 创建一个匿名 channel 存入 anonChannels map, key 为 messageID, 缓冲区大小为 channel map 的大小
  4. 创建一个defer 用来删除刚刚创建的匿名 channel,
  5. 将消息头 Sync 标志为设置为 true
  6. 遍历 channel map ,开启多协程往 channel 写 message
  7. 写超时就将 timeoutCounter ++
  8. 等待匿名通道返回响应消息
  9. 响应超时就返回接收响应消息超时错误
  10. 清理刚刚创建匿名 channel

代码不是源码,为了方便展示整合过

func (ctx *Context) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error {  if timeout <= 0 {  timeout = MessageTimeoutDefault  }  deadline := time.Now().Add(timeout)  channelList := ctx.getTypeChannel(moduleType)  if channelList == nil {  return fmt.Errorf("failed to get module type(%s) channel list", moduleType)  }  // each module must sync a response,  // let anonchan size be module number   channelNumber := len(channelList)  anonChan := make(chan model.Message, channelNumber)  anonName := getAnonChannelName(message.GetID())  ctx.anonChsLock.Lock()  ctx.anonChannels[anonName] = anonChan  ctx.anonChsLock.Unlock()  cleanup := func() error {  ctx.anonChsLock.Lock()  delete(ctx.anonChannels, anonName)  close(anonChan)  ctx.anonChsLock.Unlock()  var uninvitedGuests int  // cleanup anonchan and check parentid for resp  for resp := range anonChan {  if resp.GetParentID() != message.GetID() {  uninvitedGuests++  }  }  if uninvitedGuests != 0 {return fmt.Errorf("got some unexpected(%d) resp", uninvitedGuests)  }  return nil  }  // make sure to set sync flag before sending  message.Header.Sync = true  var timeoutCounter int32  send := func(ch chan model.Message) {  // avoid exception because of channel closing  // TODO: need reconstruction  defer func() {  if exception := recover(); exception != nil {  klog.Warningf("xxx")  }  }()  sendTimer := time.NewTimer(time.Until(deadline))  select {  case ch <- message:  sendTimer.Stop()  case <-sendTimer.C:  atomic.AddInt32(&timeoutCounter, 1)  }  }  for _, channel := range channelList {  go send(channel)  }  sendTimer := time.NewTimer(time.Until(deadline))  ticker := time.NewTicker(TickerTimeoutDefault)  for {  if len(anonChan) == channelNumber {  break  }  select {  case <-ticker.C:  case <-sendTimer.C:  err := cleanup()  if err != nil {  klog.Errorf("Failed to cleanup, error: %v", err)  }  if timeoutCounter != 0 {  return fmt.Errorf("xxx")  }  return fmt.Errorf("timeout to send message")  }  }  return cleanup()  
}

Socket Context

socket 部分主要用于远程信息交换,底层通过 net.conn 获取连接。它主要有以下几个部分组成:

  • broker: 网络代理
  • config: 配置
  • socket: socket module
  • stroe: 通信存储
  • keeper: 心跳保持
  • wapper: 消息打包

主体

数据结构

外层的Context 只负责实现 MessageContext 与 ModuleContext 接口和存储实际用于通信的 context。对应 MessageContext 接口实现也是调用 通信context 的方法来实现。

type Context struct {  // 存储 module 与 context 的关系// key 为 module,value 为 contextcontexts map[string]*context  // 存储 group 与 context 的关系// key 为 module,value 为 contextgroups map[string]*context  sync.RWMutex  
}type context struct {  name       string  address    string  moduleType string  bufferSize int  certificate tls.Certificate  store       *store.PipeStore  broker      *broker.RemoteBroker  
}

而在 context 中:

  • 消息通讯主要使用 broker 进行
  • module 存储 则主要使用 store 进行

所以,接下来,我们主要看一下 broker 与 store 的实现。

broker

数据结构

broker 是一个消息代理,它的内部,持有一个心跳保持对象:

type RemoteBroker struct {  keeper *synckeeper.Keeper  
}

同时还有一个用于建联的配置选项数据结构:

type ConnectOptions struct {  Address     string  MessageType string  BufferSize  int  Cert        tls.Certificate  // for websocket/http  RequestHeader http.Header  
}

方法

Connect

  1. 调用 ConnectFunc 获取 net.Conn
  2. 新建一个消息包装对象
func (broker *RemoteBroker) Connect(opts ConnectOptions, connect ConnectFunc) wrapper.Conn {  conn, err := connect(opts)  ...return wrapper.NewWrapper(opts.MessageType, conn, opts.BufferSize)  
}func Connect(opts broker.ConnectOptions) (interface{}, error) {  conn, err := net.Dial(opts.MessageType, opts.Address)  ... return conn, nil  
}func NewWrapper(connType string, conn interface{}, buffSize int) Conn {  readerType := reader.ReaderTypeRaw  writerType := writer.WriterTypeRaw  return &ConnWrapper{  conn:   conn,  reader: reader.NewReader(readerType, conn, buffSize),  writer: writer.NewWriter(writerType, conn),  }  
}

Send

  1. 调用 conn 的 WriteJSON 发送消息
func (broker *RemoteBroker) Send(conn wrapper.Conn, message model.Message) error {  err := conn.WriteJSON(&message)  ...  return nil  
}

Receive

  1. 设置超时时间—无
  2. 调用 conn 的 ReadJSON 读取消息
  3. 判断消息是否是同步响应消息
  4. 不是就返回消息
  5. 是就发送心跳保持消息
func (broker *RemoteBroker) Receive(conn wrapper.Conn) (model.Message, error) {  var message model.Message  for {  err := conn.SetReadDeadline(time.Time{})  err = conn.ReadJSON(&message)  ...isResponse := broker.keeper.IsSyncResponse(message.GetParentID())  if !isResponse {  return message, nil  }  err = broker.keeper.SendToKeepChannel(message)  }  
}

SendSyncInternal

  1. 设置超时时间,默认10s
  2. 设置同步标志位
  3. 调用 conn 的 WriteJSON 发送消息
  4. 发送失败就返回错误
  5. 成功就调用 conn 的 ReadJSON 等待响应消息
  6. 响应超时返回错误
  7. 响应成功就返回响应消息
func (broker *RemoteBroker) SendSyncInternal(conn wrapper.Conn, message model.Message, timeout time.Duration) (model.Message, error) {  if timeout <= 0 {  timeout = syncMessageTimeoutDefault  }  // make sure to set sync flag  message.Header.Sync = true  err := conn.WriteJSON(&message)  ...deadline := time.Now().Add(timeout)  err = conn.SetReadDeadline(deadline)  var response model.Message  err = conn.ReadJSON(&response)  ...return response, nil  
}

Store

数据结构

type PipeStore struct {// key modulepipeMap          map[string]PipeInfo  pipeMapLock      sync.RWMutex  // key group modulegroupPipeMap     map[string]map[string]PipeInfo  groupPipeMapLock sync.RWMutex  
}type PipeInfo struct {  pipe interface{}  
}

PipeInfo 存储的是一个通信对象,主要有以下几种:

  • chan model.Message
  • net.Conn
  • wrapper.Conn

方法就不过多介绍,与其他模块差异不大,因为它主要的功能就是存储。所以都是CRUD。

Wapper

wapper 包装了一个自定义的网络连接,定一个了Conn接口。其数据结构如下:

type Conn interface {  Read() ([]byte, error)  Write(message []byte) error  ReadJSON(obj interface{}) error  WriteJSON(obj interface{}) error  Close() error  SetReadDeadline(t time.Time) error  
}  // ConnWrapper conn wrapper
type ConnWrapper struct {  conn   interface{}  reader reader.Reader  writer writer.Writer  
}

对与 Conn 接口,她的实际实现主要有两种(纯粹实现就只有 ConnWrapper,但是实际上 ConnWrapper 也是调用的底层实现),Raw 和 Package 两种,其中:

  • Raw: 就是常规的buffer 实现,直接通过 net.Conn 进行操作
  • Package:则是自定义的一个消息协议,将消息封装后在通过 net.Conn 通信。

Packer

当前未使用

Packer 自定义了通信协议,它将每条消息的前12个字节作为消息头。

  • [:4] 魔术位–无实义 4 字节
  • (4:6] 版本位–存储版本号 2字节
  • (6:8] 保留位 2字节
  • (8:12] 长度位 – 存储消息长度 4字节

pack

const (  magicSize    = 4  versionSize  = 2  reservedSize = 2  // MessageLenOffest message len offest   MessageLenOffest = magicSize + versionSize + reservedSize  // MessageOffset message offset   MessageOffset = MessageLenOffest + 4  // HeaderLen header len   HeaderLen = MessageOffset  
)
type Packer struct {  Magic    [magicSize]byte  Version  [versionSize]byte  Reserved [reservedSize]byte  Length   int32  Message  []byte  
}

Read

func (p *Packer) Read(reader io.Reader) error {// 读取魔术位err := binary.Read(reader, binary.BigEndian, &p.Magic)  ... // 读取版本位err = binary.Read(reader, binary.BigEndian, &p.Version)  ... // 读取保留位 err = binary.Read(reader, binary.BigEndian, &p.Reserved)  ...// 读取长度位err = binary.Read(reader, binary.BigEndian, &p.Length)  ... // 读取消息实体err = binary.Read(reader, binary.BigEndian, &p.Message)  ...return err  
}

Write

var (  headerTags = [HeaderLen]byte{'b', 'e', 'e', 'h', 'v', '1', 'r', 'v', 0, 0, 0, 0}  
)func (p *Packer) Write(writer io.Writer) error {  // 通过位运算写入数据长度headerTags[MessageLenOffest] = byte(uint32(p.Length) >> 24)  headerTags[MessageLenOffest+1] = byte(uint32(p.Length) >> 16)  headerTags[MessageLenOffest+2] = byte(uint32(p.Length) >> 8)  headerTags[MessageLenOffest+3] = byte(uint32(p.Length)) // 写入消息头 err := binary.Write(writer, binary.BigEndian, &headerTags)  ... // 写入消息实体err = binary.Write(writer, binary.BigEndian, &p.Message)  ... return nil  
}

到此,我们基本上介绍完了它的内部核心实现,接下来,我们来看看的外层包装

Module

我们在上述的章节中一直提到了 modlue 这样一个概念,在 kubeedge 中,它通过 Module 这样一个接口来约定,只要实现了这个接口,kubeedge 就认为你是一个 modlue。

type Module interface {  Name() string  Group() string  Start()  Enable() bool  
}

core 模块通过 ModuleInfo 存储单个 modlue 的信息,各个模块则使用 Register() 方法将自己注册进来,然后所有的 ModuleInfo 会存储到 modules map 和 disabledModules map 中,并通过 GetModules() 方法将 module 暴露出去。

var (  // Modules map  modules         map[string]*ModuleInfo  disabledModules map[string]*ModuleInfo  
)
type ModuleInfo struct {  contextType string  remote      bool  module      Module  
}func Register(m Module, opts ...string) {  info := &ModuleInfo{  module:      m,  contextType: common.MsgCtxTypeChannel,  remote:      false,  }  if len(opts) > 0 {  info.contextType = opts[0]  info.remote = true  }  if m.Enable() {  modules[m.Name()] = info  } else {  disabledModules[m.Name()] = info  }  
}
func GetModules() map[string]*ModuleInfo {  return modules  
}

StartModules

  1. 默认初始化 Context 为 channel 类型
  2. 获取所有已注册的 modules
  3. 遍历 modules 将各个 module 加入到Context
  4. 启动 module
func StartModules() {  // only register channel mode, if want to use socket mode, we should also pass in common.MsgCtxTypeUS parameter  beehiveContext.InitContext([]string{common.MsgCtxTypeChannel})  modules := GetModules()  for name, module := range modules {  var m common.ModuleInfo  switch module.contextType {  case common.MsgCtxTypeChannel:  m = common.ModuleInfo{  ModuleName: name,  ModuleType: module.contextType,  }  case common.MsgCtxTypeUS:  m = common.ModuleInfo{  ModuleName: name,  ModuleType: module.contextType,  // the below field ModuleSocket is only required for using socket.  ModuleSocket: common.ModuleSocket{  IsRemote: module.remote,  },  }  default:  klog.Exitf("unsupported context type: %s", module.contextType)  }  beehiveContext.AddModule(&m)  beehiveContext.AddModuleGroup(name, module.module.Group())  go moduleKeeper(name, module, m)  klog.Infof("starting module %s", name)  }  
}func moduleKeeper(name string, moduleInfo *ModuleInfo, m common.ModuleInfo) {  for {  moduleInfo.module.Start()  // local modules are always online  if !moduleInfo.remote {  return  }  // try to add module for remote modules  beehiveContext.AddModule(&m)  beehiveContext.AddModuleGroup(name, moduleInfo.module.Group())  }  
}

GracefulShutdown

  1. 开启一个通道接收系统信号
  2. 接收到信号就执行 Context.Cancel() 方法
  3. 获取所有已注册的 modules
  4. 调用 Context.Cleanup() 方法 清理module.
func GracefulShutdown() {  c := make(chan os.Signal, 1)  signal.Notify(c, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM,  syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT)  s := <-c  klog.Infof("Get os signal %v", s.String())  // Cleanup each modules  beehiveContext.Cancel()  modules := GetModules()  for name := range modules {  klog.Infof("Cleanup module %v", name)  beehiveContext.Cleanup(name)  }  
}

http://chatgpt.dhexx.cn/article/7k9kPc8I.shtml

相关文章

谷粒商城简介(1~5集)

谷粒商城简介&#xff08;1~5集&#xff09; 一、项目简介 1、项目背景 1&#xff09;、电商模式 市面上有 5 种常见的电商模式 B2B、B2C、C2B、C2C、O2O&#xff1b; 1、B2B 模式 B2B (Business to Business)&#xff0c; 是指商家与商家建立的商业关系。 如&#xff1a;阿…

谷粒商城:分布式基础概念(2)

微服务 微服务架构风格&#xff0c;就像是把一个单独的应用程序开发为一套小服务&#xff0c;每个小服务运行在自 己的进程中&#xff0c;并使用轻量级机制通信&#xff0c;通常是 HTTP API。这些服务围绕业务能力来构建&#xff0c; 并通过完全自动化部署机制来独立部署。这些…

beetl,freemarker,thymeleaf对比及springboot集成

调研类型&#xff1a; Freemarker&#xff0c;Thymeleaf&#xff0c;Beetl&#xff0c;Velocity 调研方向&#xff1a; 性能&#xff0c;活跃度&#xff0c;各自优缺点&#xff0c;应用实例 2.1、性能报告&#xff1a; Jdk:1.8 Cpu: 8核12线程 Jvm : -Xms512m -Xmx512m B…

部分壳与脱壳

壳与脱壳 对网上部分壳与脱壳的摘录与总结&#xff0c;仅供参考&#xff0c;侵删 参考链接1 https://www.52pojie.cn/thread-138380-1-1.html 参考链接2 https://www.cnblogs.com/milantgh/p/3869083.html 参考链接3 http://blog.sina.com.cn/s/blog_3e28c8a5010132m6.html 壳…

谷粒商城项目学-分布式基础

项目框架图 分布式基础概念 • 微服务、注册中心、配置中心、远程调用、Feign、网关 • 2、基础开发 • SpringBoot2.0、SpringCloud、Mybatis-Plus、Vue组件化、阿里云对象存储 • 3、环境 • Vagrant、Linux、Docker、MySQL、Redis、逆向工程&人人开源 • 4、开发规范 •…

【笔记/后端】谷粒商城基础篇

目录 一、环境配置1 Docker1.1 Docker是什么&#xff1f;1.2 安装&启动1.2.1 阿里云镜像加速 1.3 安装MySQL1.4 安装Redis 2 开发环境2.1 Maven2.2 Git2.3 Node 二、创建微服务项目1 内容2 问题记录3 renren-generator 三、分布式组件1 Nacos1.1 注册中心1.2 配置中心1.2.1…

谷粒商城(二)

谷粒商城&#xff08;二&#xff09; 后台商品服务 - 三级分类1、查询1&#xff09;、接口编写2&#xff09;、树形展示三级分类数据3&#xff09;、配置网关路由1 更改前端 base 路径2 将服务注册进nacos3 网关模块配置路由4 测试 4&#xff09;、解决跨域 2、删除1&#xff0…

谷粒商城(五)

谷粒商城&#xff08;五&#xff09; 订单服务1、环境搭建1&#xff09;、页面2&#xff09;、代码 2、订单登录拦截3、订单确认页1&#xff09;、VO模型2&#xff09;、订单确认页数据查询1 接口编写2 调用远程服务 3&#xff09;、Feign远程调用丢失请求头启动服务报错解决 4…

谷粒商城(一)

谷粒商城&#xff08;一&#xff09; 1、环境搭建安装 dockerdocker 安装 mysqldocker 安装 redis安装配置 git准备工具 IDEA、VsCode从 gitee 初始化项目 2、创建微服务项目1&#xff09;、创建项目2&#xff09;、初始化数据库 3、使用人人开源搭建后台管理系统1&#xff09;…

谷粒商城:如何通过笔记复盘实现事半功倍?

前言 把谷粒商城做了一遍&#xff0c;其中遇的困难也记录了一下。将零散的笔记整理成有顺序的目录结构。方便自己回看、以及快速定位文章。特此记录、大部分在CSDN博客里边都可以搜索到。 大家想看的话也可以去这里看看&#xff1a;笔记地址传送门 后续还会继续维护这个笔记…

查壳、加壳、脱壳详细教程

查壳教程 1、打开软件后我们点击右上角的三个点&#xff0c;会弹出一个选择文件的窗口&#xff0c;我们选择要查壳的文件&#xff0c;同样也可以直接把需要查壳的软件拖到PEID页面里 2、这里拖入一个程序后出现如下信息页面 这里我们看到Borland Delphi 3.0,他不是一种壳&…

分布式项目-谷粒商城。

分布式项目一&#xff0c;分布图 二&#xff0c;环境搭建 1.安装linux 2.安装docker 1 卸载系统之前的docker sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine2 设…

竞业限制是什么意思?

竞业限制是指用人单位与掌握商业秘密的职工约定在劳动合同解除或终止后的一定期限内&#xff0c;劳动者不得到有竞争关系的其他用人单位任职&#xff0c;也不得自己生产与原单位有竞争关系的同类产品或经营同类业务。 竞业限制对不同的人意义是不同的&#xff0c;比如&#xf…

所谓的1261考核法算不算是末尾淘汰?

【问题】 有无精通劳动法的老哥&#xff0c;分析一下所谓的1261考核法算不算是末尾淘汰&#xff1f; 已知最后考核的那个1会被约谈&#xff0c;连续两次都是最后就会调岗或者解除劳动合同 【解析】 算的&#xff01; 应该是“末位淘汰制” 劳动法不支持“末位淘汰制”&…

数字滤波器的实现——低通滤波器再探究

在探究完滤波器原理之后&#xff0c;又面临一个问题就是数字滤波器如何实现的问题&#xff0c;因为在实际应用过程中&#xff0c;如果不接触硬件的话&#xff0c;低通滤波器一般都是通过编程实现的&#xff0c;具体代码应该怎么编写&#xff0c;在应用过程中又应该注意什么问题…

数字图像处理之低通滤波器实现原理及方法(Matlab)

1.傅里叶变换与频域 在之前的文中&#xff0c;我们已经进行过一些基本的图像处理。比如&#xff0c;使用低通滤波可以将图像模糊&#xff0c;也有些许降噪的作用。这些都是在空间域内进行的滤波处理&#xff0c;这个处理主要是依靠卷积来进行计算的。首先&#xff0c;从连续的一…

滤波器_理想低通/高通滤波器原理

1.滤波器作用 消除干扰杂讯噪声&#xff0c;对信号进行频率成分的选择2.高通滤波 过滤低频信息&#xff0c;让高频信息通过3.低通滤波 过滤高频信息&#xff0c;让低频信息通过4.理想低通滤波 D0表示通带半径&#xff0c;D(u&#xff0c;v)是到频谱中心的距离(欧式距离),公式…

带通滤波器电路图大全(三款带通滤波器电路设计原理图详解)

带通滤波器电路图设计&#xff08;一&#xff09; 传统的带通滤波器设计方法中涉及了很多复杂的理论分析和计算。针对上述缺点&#xff0c;介绍一种使用EDA软件进行带通滤波器的设计方案&#xff0c;详细阐述了使用FilterPro软件进行有源带通滤波器电路的设计步骤&#xff0c;…

T滤波器(低通滤波器)

1.电路原理 T滤波器&#xff0c;其基本原理是基于低通滤波器设计&#xff0c;实现阻高频通低频的需求&#xff0c;其电路图及传递函数如下。 2.传递函数 r(t)为输入,c(t)为输出&#xff0c;从电路原理我们得到输入输出公式&#xff1a; 将公式进行拉氏变换得到&#xff1a; 3.系…

低通滤波器和高通滤波器的程序实现原理推导

傅立叶变换,拉普拉斯变换和Z变换 对于信号分析而言,傅立叶变换是必不可少的,我们都知道傅立叶变换是把系统从时域变换到频域进行分析,那么拉普拉斯变换和Z变换是干什么的?简单的来说,由于傅里叶变换的收敛有一个狄利克雷条件&#xff0c;要求信号绝对可积/绝对可和。对于那些…