介绍
最近在做一个关于无线移动自组织网络相关的内容,移动自组织网络是由彼此已经发现和互相接近的且有通信需求的移动设备构成的。由于网内节点的移动拓扑是不可知的,所以各自组网结构是动态变化的。要根据其各个实时动态形成的自组网开发新的内容,这就需要一个可以探测网内节点数量的工具来设定当前自组网内状态。在网上找寻相关工具资料的时候想到OSPF的hello组播包,觉得这个实现很不错,于是尝试用go写了个udp组播实现的探测网内节点的工具,分享下。
其实现原理就是使用定时器定时触发helllo组播包给组播域,如果节点接收到其他节点发送的组播包就记录下。每一个总检测周期内查询接收的节点,根据接收到的节点名就可以知道该节点的自组网下节点有哪些。例如场景中有1-10个节点,1节点能够收到2-7节点的组播包说明1-7节点在同一自组网内属于该自组网的存活节点。(非存活节点并不是表示该节点down,仅仅是代表该节点例如节点8不在这个自组网内)
实现过程
go中对udp等套接字的官方包实现类似unix,基本的套接字知识就不在这回顾了,总之要了解下udp也是可以使用connect函数做有连接的请求,并不是说如tcp那般握手提供可靠连接,仅仅是单方面的确定源目标地址的连接。
组播定义:组播是指在IP网络中将数据包以尽力传送的形式发送到某个确定的节点集合(即组播组),其基本思想是:源主机(即组播源)只发送一份数据,其目的地址为组播组地址;组播组中的所有接收者都可收到同样的数据拷贝,并且只有组播组内的主机可以接收该数据,而其它主机则不能收到。
基础的用go的套接字示例实现看看深入Go UDP编程这篇博客即可,简单易用。组播部分代码也是使用在这篇文章的通用多播编程代码1改写的
// 组播服务器代码
func main() {
//如果第二参数为nil,它会使用系统指定多播接口,但是不推荐这样使用
addr, err := net.ResolveUDPAddr("udp", "224.0.0.250:9981")
if err != nil {
fmt.Println(err)
}
listener, err := net.ListenMulticastUDP("udp", nil, addr)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Local: <%s> \n", listener.LocalAddr().String())
data := make([]byte, 1024)
for {
n, remoteAddr, err := listener.ReadFromUDP(data)
if err != nil {
fmt.Printf("error during read: %s", err)
}
fmt.Printf("<%s> %s\n", remoteAddr, data[:n])
}
}
// 组播客户端代码
func main() {
ip := net.ParseIP("224.0.0.250")
srcAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0}
dstAddr := &net.UDPAddr{IP: ip, Port: 9981}
conn, err := net.DialUDP("udp", srcAddr, dstAddr)
if err != nil {
fmt.Println(err)
}
defer conn.Close()
conn.Write([]byte("hello"))
fmt.Printf("<%s>\n", conn.RemoteAddr())}
build代码后运行发现客户端运行报错:dial udp 0.0.0.0:0->224.0.0.250:9981: connect: network is unreachable
.
客户端的错误应该路由不可达没配路由所以无法到达224网段。可以设置路由解决问题,例如ip route add 224.0.0.0/8 dev ens33
但这在用户层面操作太蠢了,而且会干扰原有的路由表。应该在代码处指定发送方网卡用该网卡ip发送组播包。所以修改了下客户端的代码,其中客户端传参需要发送组播ip的网卡。
其中获取网卡ip的函数gain_ip是参考这篇文章2改的。我这里的功能修改为读出输入网卡的任意一个有效的ipv4地址。
//DialUDP的srcAddr设置0.0.0.0:0 或nil会自己找匹配路由 端口号为0表示系统分配端口
func main() {
//选择使用那张网卡发送组播包(取出其ip作源地址), 不输入参数网卡设定为空 eg. eth0
var ethAddr net.IP = nil
if len(os.Args) > 1 {
ethAddr = gain_ip(os.Args[1])
}
if ethAddr == nil {
fmt.Printf("Failed to retrieve valid ipv4 address as source address,\nsearch for matching route to send\n")
ethAddr = net.IPv4zero
}
srcAddr := &net.UDPAddr{IP: ethAddr, Port: 0}
dstAddr, _ := net.ResolveUDPAddr("udp", "224.0.0.250:9981")
conn, err := net.DialUDP("udp", srcAddr, dstAddr)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
conn.Write([]byte("hello"))
fmt.Printf("<%s>\n", conn.RemoteAddr())
}
// 取得网卡的任意一个有效ipv4地址
func gain_ip(ethname string) net.IP {
eth, _ := net.InterfaceByName(ethname)
Addrs, err := eth.Addrs()
if err != nil {
// fmt.Println(err) //网卡无效
return nil
}
for _, Addr := range Addrs {
if ipnet, ok := Addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { //接口转为结构体
if ipnet.IP.To4() != nil {
return ipnet.IP.To4()
}
}
}
return nil //找不到有效网卡地址
}
运行服务器端发现也有报错:setsockopt:no such device
,查看了ListenMulticastUDP函数的源码,发现上面的例程ListenMulticastUDP没有传入第二个接口参数,会使得程序找寻默认到该组播域的路由所以配路由也能解决该问题。我这里还是传参一个网卡名称解决。
func main() {
var ethname string = ""
if len(os.Args) > 1 {
ethname = os.Args[1]
}
/* listener, err := net.ListenMulticastUDP("udp", nil, Multicast_addr)
ListenMulticastUDP监听本地系统的所有可用IP地址,包括组播组IP地址。
如果ifi(第二个参数)为零,ListenMulticastUDP使用系统分配的多播接口,但不建议这样做,
因为分配取决于平台,有时可能需要路由配置。如果gaddr的端口字段为0,则会自动选择一个端口号。
*/
eth, _ := net.InterfaceByName(ethname) //选择使用那张网卡加入组播域
Multicast_addr, _ := net.ResolveUDPAddr("udp", "224.0.0.250:9981")
listener, err := net.ListenMulticastUDP("udp", eth, Multicast_addr)
if err != nil {
fmt.Println(err)
return
}
defer listener.Close()
fmt.Printf("Listener Multicast addr: <%s> \n", Multicast_addr.String())
data := make([]byte, 1024)
for {
n, remoteAddr, err := listener.ReadFromUDP(data)
if err != nil {
fmt.Printf("error during read: %s", err)
}
fmt.Printf("<src ip: %s> %s\n", remoteAddr, data[:n])
}
}
示例代码
准确的相对时间触发对分布式系统尤为重要,虽然只是简单的调用定时器,但是这位大佬的博客3对go中定时器的有比较好的分析,打日志的参考文章4也标注出来。
最后整合了下客户端和服务器接收,每个移动设备运行该程序(定时发动组播包,后台协程监听来自组播域其他节点包消息)就能探测各网内存活/在线设备。关于接收数据的处理我这里故意只打日志不做其他处理,关于接收的数据如何处理,处理数据具体干啥就看你自己的用途了。就像我参考的这些文章一样,希望能对读者有所帮助。
package main
import (
"fmt"
"io"
"log"
"net"
"os"
"time"
)
type myconn struct {
send *net.UDPConn
recv chan int
}
var (
Info *log.Logger
Warning *log.Logger
Error *log.Logger
)
func init() {
logfile, err := os.OpenFile("Nodes_multicast.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalln("打开日志文件失败:", err)
}
Info = log.New(logfile, "Info: ", log.Lmicroseconds|log.Lshortfile)
Warning = log.New(io.MultiWriter(os.Stdout, logfile), "Warning: ", log.Lmicroseconds|log.Lshortfile)
Error = log.New(io.MultiWriter(os.Stderr, logfile), "Error: ", log.Lmicroseconds|log.Lshortfile)
}
func main() {
// 使用参数1的网卡发送和监听组播包(取出其ip作源地址), 不输入参数使用默认路由
// 参数2代表运行客户端nem节点号,默认为1
var ethname, nemid string = "", "1"
if len(os.Args) > 2 {
ethname = os.Args[1]
nemid = os.Args[2]
} else {
Warning.Println("No parameter is entered,now using default route and nemid:", nemid)
}
conn, err := multicast_init(ethname)
if err != nil {
Error.Println("multicast init error:", err)
return
}
defer conn.send.Close()
Info.Println("Nodes start multicast :")
send_interval := time.Tick(time.Second * 3)
check_interval := time.Tick(time.Second * 15)
for {
select {
case <-send_interval:
Info.Println("\"send msg\"")
// s := []byte(fmt.Sprintf("Hello,I'm %s", nemid))
s := []byte("Hello,I'm " + nemid)
conn.send.Write(s)
case <-check_interval:
Info.Println("\"check schedul\"")
case num := <-conn.recv:
Info.Println("\"recv msg\" ", num)
}
}
}
// 取得网卡的任意一个有效ipv4地址
func gain_ip(ethname string) net.IP {
eth, _ := net.InterfaceByName(ethname)
Addrs, err := eth.Addrs()
if err != nil {
// Warning.Println(err) //网卡无效
return nil
}
for _, Addr := range Addrs {
if ipnet, ok := Addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { //接口强转为结构体
if ipnet.IP.To4() != nil {
return ipnet.IP.To4()
}
}
}
return nil
}
// udp组播的发送初始化,并开始监听组播域
func multicast_init(ethname string) (myconn, error) {
ethAddr := gain_ip(ethname)
if ethAddr == nil {
Warning.Printf("Failed to retrieve valid ipv4 address as source address,\nsearch for matching route to send\n")
ethAddr = net.IPv4zero
}
srcAddr := &net.UDPAddr{IP: ethAddr, Port: 0}
Multicast_addr, _ := net.ResolveUDPAddr("udp", "224.0.0.250:9981")
Info.Println("send addr is", srcAddr, "ListenMulticast is", Multicast_addr)
send, err := net.DialUDP("udp", srcAddr, Multicast_addr)
if err != nil {
return myconn{}, err
}
eth, _ := net.InterfaceByName(ethname)
listener, err := net.ListenMulticastUDP("udp", eth, Multicast_addr)
if err != nil {
send.Close()
return myconn{}, err
}
data := make([]byte, 1024)
datach := make(chan int, 0)
go func() { // go的匿名函数默认捕获上下文变量
defer listener.Close()
for {
n, remoteAddr, err := listener.ReadFromUDP(data)
if err != nil {
Warning.Printf("error during read: %s", err)
continue
}
s := string(data[:n])
if remoteAddr.IP.Equal(ethAddr) {
continue // 排除自身发出的组播包
}
// Info.Printf("receive %s %d bytes: %s\n", remoteAddr, n, s)
_, err = fmt.Sscanf(s, "Hello,I'm %d", &n)
if err != nil {
Warning.Printf("error during Sscanf: %s", err)
continue
}
datach <- n
}
}()
return myconn{send: send, recv: datach}, nil
}
后续内容
其实我使用这个工具只是做的TDMA动态时隙的统计,大致做法是将各个节点的接收数据做记录存入map中。没到检测时间的管道计时到达后开始处理map数据生成时隙表。注意go中的map并不是并发安全的,需要加sync.Mutex互斥锁或sync.RWMutex读写锁来避免竞争冒险。不过好在我是分开在select中处理不会出现上述问题,go中也有人做了并发安全的set5。
关于go的文件读写还是要提下,*bufio.Writer *os.File
这两个接口都实现可writestring函数,下面的写文件代码为了提升效率新建了bufio,bufio 在一定场景下还是很能提升效率的,不过还是需要注意与直接写入文件的异同,防止缓存数据未同步的状况发生。如下,写完文件后需要调用Flush刷新。
func write_schedule(filepath string, newdata []string) error {
file, err := os.Create(filepath)
if err != nil {
return err
}
defer file.Close()
writer := bufio.NewWriter(file)
for _, str := range newdata {
_, err = writer.WriteString(str + "\n")
}
//注意,bufio 通过flush操作将缓冲写入真实的文件的,在关闭文件之前先flush,否则会造成数据丢失的情况。
writer.Flush()
return err
}
之后有可能会分享下后续具体动态时隙内容。
组播探测程序:
组播部分:
每个节点都会通过指定网卡向组播域中发送hello包,定时间隔为 'send_interval_time'
通过加入该组播域监听该组播域内消息,做到类似探测网内存活节点数量目的
定时检测时隙(组播域中成员是否有变化),检测时间为 'check_interval_time'
即一次检测时隙周期内接收到对应nemid组播包即认为该节点存活
收发包格式为 "Hello,I'm $nemid"
发送控制网时隙消息:
确定主控制网网桥IP,例如172.16.0.254
接收的组播记录存储至 并发安全set中,我这里的实现采用map(并发不安全)
这是因为我使用select管道传输数据再进行修改map就不会出现竞争冒险,造成并发的读写
各个节点信息综合格式为 I'm 1,recv 2 3 4 5
时隙统计程序:
首先打开当前场景的时隙分配xml文件,获取基本参数信息
然后定时检测各个节点发送的节点监测消息,根据优先级固定的顺序分配时隙
一定是先排列完全部节点后再继续按照节点优先级顺序排列时隙,
所以优先级高的节点其分配到时隙的可能性越大
当前优先级 简单的处理为 nemid,id越小 优先级越高,具体参考myschedule_create的实现
检测时间间隔与多播程序的时间间隔相同 'check_interval'
注意:这里仅仅是为了处理方便将所有节点时隙数据整合在一起
其实实际各个节点的时隙是单独分派的,例如若两节点n1 n15分别在两个网内(组播域中未互相探测到)
所以他们的时隙很可能是会有重合冲突的部分,但由于组播无法互相探测到可认为是无影响的。
// 根据时隙表规则,设定以节点id为优先级 固定顺序的排列时隙表,举例 时隙为10
// 1收到 2 3: 顺序:1 2 3 1 2 3 1 2 3 1 1的时隙:0,3,6,9
// 7收到 8 9 4: 顺序:4 7 8 9 4 7 8 9 4 7 7的时隙:1,5,9
// 要推算节点 datas.id 的时隙分布,只要知道时隙个数,优先级排名(id越小,优先级越高)即可推算
鸟窝. 深入Go UDP编程 (colobu.com) 通用多播编程 ↩︎
不二星空. GO实现获取本地IP地址(csdn.net) 获取网卡ip ↩︎
Draveness. Go 语言设计与实现(draveness.me) 并发编程与计时器 ↩︎
飞雪无情. Go语言实战笔记(flysnow.org) 定制go日志 ↩︎