ICMP协议编程实践:实现ping命令(Python语言)

众所周知, ping 命令通过 ICMP 协议探测目标 IP 并计算 往返时间 。 本文使用 Python 语言开发一个 ping 命令, 以演示如何通过 套接字 发送接收 ICMP 协议报文。

注解

程序源码 可在本文末尾复制,或者在 Github 上下载: ping.py

报文封装

ICMP 报文承载在 IP 报文之上,头部结构非常简单:

../_images/476c9d2e44224eaa078f80bdbad440f9.gif

注意到, ICMP 头部只有三个固定字段,其余部分因消息类型而异。固定字段如下:

  • type消息类型
  • code代码
  • checksum校验和

ICMP 报文有很多不同的类型,由 typecode 字段区分。 而 ping 命令使用其中两种:

../_images/c633276d3679c45943a4f2d7c2b55e05.png

ping命令原理

如上图,机器 A 通过 回显请求 ( Echo Request ) 询问机器 B ; 机器 B 收到报文后通过 回显答复 ( Echo Reply ) 响应机器 A 。 这两种报文的典型结构如下:

../_images/31beaa9ddfb5278c7cd98dc4c8624a5b.png

对应的 type 以及 code 字段值列举如下:

表-1 回显报文类型
名称 类型 ”代码“
回显请求 8 0
回显答复 0 0

按照惯例,回显报文除了固定字段,其余部分组织成 3 个字段:

  • 标识符 ( identifier ),一般填写进程 PID 以区分其他 ping 进程;
  • 报文序号 ( sequence number ),用于编号报文序列;
  • 数据 ( data ),可以是任意数据;

ICMP 规定, 回显答复 报文原封不动回传这些字段。 因此,可以将 发送时间 封装在 数据负载 ( payload )中, 收到答复后将其取出,用于计算 往返时间 ( round trip time )。

Python 标准库 struct 模块提供了用于 封装网络报文 的工具,可以这样封装数据负载:

import struct

sending_ts = time.time()
payload = struct.pack('!d', sending_ts)

这段代码将当前时间戳封装起来,其中 ! 表示 网络 字节序d 表示双精度浮点。

封装报文头部也是类似的:

header = struct.pack('!BBHHH', _type, code, checksum, ident, seq)
icmp = header + payload

其中, B 表示长度为一个字节的无符号整数, H 表示长度为两个字节的无符号整数。

校验和

ICMP 报文校验和字段需要自行计算,计算步骤如下:

  1. 0 为校验和封装一个用于计算的 伪报文
  2. 将报文分成两个字节一组,如果总字节数为奇数,则在末尾追加一个零字节;
  3. 对所有 双字节 进行按位求和;
  4. 将高于 16 位的进位取出相加,直到没有进位;
  5. 将校验和按位取反;

示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def calculate_checksum(icmp):
    if len(icmp) % 2:
        icmp += b'\00'

    checksum = 0
    for i in range(len(icmp)//2):
        word, = struct.unpack('!H', icmp[2*i:2*i+2])
        checksum += word

    while True:
        carry = checksum >> 16
        if carry:
            checksum = (checksum & 0xffff) + carry
        else:
            break

    checksum = ~checksum & 0xffff

    return struct.pack('!H', checksum)

套接字

编程实现网络通讯,离不开 套接字 ( socket ),收发 ICMP 报文当然也不例外:

from socket import socket, AF_INET, SOCK_RAW, IPPROTO_ICMP

s = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)

调用 sendto 系统调用发送 ICMP 报文:

s.sendto(icmp, 0, ('xxx.xxx.xxx.xxx', 0))

其中,第一个参数为封装好的 ICMP 报文; 第二个参数为发送标志位,无特殊要求一般填 0 ; 第三个参数为目的 IP 地址-端口对,端口这里填 0

调用 recvfrom 方法接收 ICMP 报文:

ip, (src_ip, _) = s.recvfrom(1500)
icmp = ip[20:]

参数为接收缓冲区大小,这里用 1500 刚好是一个典型的 MTU 大小。 注意到, recvfrom 系统调用返回 IP 报文,去掉前 20 字节的 IP 头部便得到 ICMP 报文。

注解

注意,创建 原始套接字 ( SOCK_RAW )需要超级用户权限。

程序实现

掌握基本原理后,便可着手编写代码了。

首先,实现 pack_icmp_echo_request 函数,用于封装 ICMP 回显请求 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def pack_icmp_echo_request(ident, seq, payload):
    pseudo = struct.pack(
        '!BBHHH',
        8,
        0,
        0,
        ident,
        seq,
    ) + payload
    checksum = calculate_checksum(pseudo)
    return pseudo[:2] + checksum + pseudo[4:]

2-9 行封装用于计算校验和的 伪报文 , 注意到 类型 字段为 8代码 字段为 0校验和 字段为 0标识符序号 以及 数据负载 字段由参数指定; 第 10 行调用 calculate_checksum 函数计算 校验和 ; 第 11 行替换伪报文中的校验和并返回。

对应地,实现 unpack_icmp_echo_reply 用于解析 ICMP 回显答复 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def unpack_icmp_echo_reply(icmp):
    _type, code, _, ident, seq, = struct.unpack(
        '!BBHHH',
        icmp[:8]
    )
    if _type != 0:
        return
    if code != 0:
        return

    payload = icmp[8:]

    return ident, seq, payload

接着,实现 send_routine 用于循环发送 ICMP 回显请求 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def send_routine(sock, addr, ident, magic, stop):
    # first sequence no
    seq = 1

    while not stop:
        # currrent time
        sending_ts = time.time()

        # packet current time to payload
        # in order to calculate round trip time from reply
        payload = struct.pack('!d', sending_ts) + magic

        # pack icmp packet
        icmp = pack_icmp_echo_request(ident, seq, payload)

        # send it
        sock.sendto(icmp, 0, (addr, 0))

        seq += 1
        time.sleep(1)

该函数需要 5 个参数,分别如下:

  • sock ,用于发送报文的 套接字
  • addr ,目标 IP 地址
  • ident标识符
  • magic ,打包在数据负载中的魔性字符串;
  • stop ,停止发送标识;

3 行定义 报文序号 ,从 1 开始递增; 接着是发送循环,不停发包,每次相隔一秒; 第 7 行获取 发送时间戳 ; 第 11 行将时间戳以及魔性字符串打包成 数据负载 ; 第 14 行调用 pack_icmp_echo_request 封装 回显请求 报文; 第 17 行调用 sendto 系统调用 发送报文 ; 第 19-20 行自增发送序号并等待一秒。

同样,实现 recv_routine 函数用于循环接收 ICMP 回显答复 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def recv_routine(sock, ident, magic):
    while True:
        # wait for another icmp packet
        ip, (src_addr, _) = sock.recvfrom(1500)

        # unpack it
        result = unpack_icmp_echo_reply(ip[20:])
        if not result:
            continue

        # print info
        _ident, seq, payload = result
        if _ident != ident:
            continue

        sending_ts, = struct.unpack('!d', payload[:8])
        print('%s seq=%d %5.2fms' % (
            src_addr,
            seq,
            (time.time()-sending_ts) * 1000,
        ))

4 行调用 recvfrom 系统调用接收 ICMP 报文; 第 7 行调用 unpack_icmp_echo_reply 解析报文 ; 第 8-9 行忽略非回显答复报文; 第 13-14 行检查标识符并忽略非法报文(可能是响应其他进程的); 第 16 行从 数据负载 中取出 发送时间戳 ; 第 17-21 行,计算 往返时间 并输出提示。

报文 发送接收 均实现完毕,如何让程序同时干两件事情呢? 可以选用 线程 方案:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def ping(addr):
    # create socket for sending and receiving icmp packet
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

    # id field
    ident = os.getpid()
    # magic string to pad
    magic = b'1234567890'

    # sender thread stop flag
    # append anything to stop
    sender_stop = []

    # start sender thread
    # call send_routine function to send icmp forever
    args = (sock, addr, ident, magic, sender_stop,)
    sender = threading.Thread(target=send_routine, args=args)
    sender.start()

    try:
        # receive icmp reply forever
        recv_routine(sock, ident, magic)
    except KeyboardInterrupt:
        pass

    # tell sender thread to stop
    sender_stop.append(True)

    # clean sender thread
    sender.join()

    print()

3 行创建用于发送、接收报文的 套接字 ; 第 6 行获取进程 PID 作为 标识符 ; 第 16-18 行启动一个 子线程 执行 报文发送 函数; 第 20-24主线程 执行 报文接收 函数直至用户按下 ctrl-C ; 第 27 行程序退出前,通知发送线程退出并回收线程资源( join )。

将以上所有代码片段组装在一起,便得到 ping.py 命令。 迫不及待想运行一下:

$ sudo python ping.py 8.8.8.8
8.8.8.8 seq=1 23.18ms
8.8.8.8 seq=2 22.25ms
8.8.8.8 seq=3 34.18ms

It works!

程序源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python
# -*- encoding=utf8 -*-

'''
FileName:   ping.py
Author:     Fasion Chan
@contact:   fasionchan@gmail.com
@version:   $Id$

Description:

Changelog:

'''

import os
import socket
import struct
import sys
import threading
import time

def calculate_checksum(icmp):
    if len(icmp) % 2:
        icmp += b'\00'

    checksum = 0
    for i in range(len(icmp)//2):
        word, = struct.unpack('!H', icmp[2*i:2*i+2])
        checksum += word

    while True:
        carry = checksum >> 16
        if carry:
            checksum = (checksum & 0xffff) + carry
        else:
            break

    checksum = ~checksum & 0xffff

    return struct.pack('!H', checksum)

def calculate_checksum(icmp):
    highs = icmp[0::2]
    lows = icmp[1::2]

    checksum = ((sum(highs) << 8) + sum(lows))

    while True:
        carry = checksum >> 16
        if carry:
            checksum = (checksum & 0xffff) + carry
        else:
            break

    checksum = ~checksum & 0xffff

    return struct.pack('!H', checksum)

def pack_icmp_echo_request(ident, seq, payload):
    pseudo = struct.pack(
        '!BBHHH',
        8,
        0,
        0,
        ident,
        seq,
    ) + payload
    checksum = calculate_checksum(pseudo)
    return pseudo[:2] + checksum + pseudo[4:]

def unpack_icmp_echo_reply(icmp):
    _type, code, _, ident, seq, = struct.unpack(
        '!BBHHH',
        icmp[:8]
    )
    if _type != 0:
        return
    if code != 0:
        return

    payload = icmp[8:]

    return ident, seq, payload

def send_routine(sock, addr, ident, magic, stop):
    # first sequence no
    seq = 1

    while not stop:
        # currrent time
        sending_ts = time.time()

        # packet current time to payload
        # in order to calculate round trip time from reply
        payload = struct.pack('!d', sending_ts) + magic

        # pack icmp packet
        icmp = pack_icmp_echo_request(ident, seq, payload)

        # send it
        sock.sendto(icmp, 0, (addr, 0))

        seq += 1
        time.sleep(1)

def recv_routine(sock, ident, magic):
    while True:
        # wait for another icmp packet
        ip, (src_addr, _) = sock.recvfrom(1500)

        # unpack it
        result = unpack_icmp_echo_reply(ip[20:])
        if not result:
            continue

        # print info
        _ident, seq, payload = result
        if _ident != ident:
            continue

        sending_ts, = struct.unpack('!d', payload[:8])
        print('%s seq=%d %5.2fms' % (
            src_addr,
            seq,
            (time.time()-sending_ts) * 1000,
        ))

def ping(addr):
    # create socket for sending and receiving icmp packet
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

    # id field
    ident = os.getpid()
    # magic string to pad
    magic = b'1234567890'

    # sender thread stop flag
    # append anything to stop
    sender_stop = []

    # start sender thread
    # call send_routine function to send icmp forever
    args = (sock, addr, ident, magic, sender_stop,)
    sender = threading.Thread(target=send_routine, args=args)
    sender.start()

    try:
        # receive icmp reply forever
        recv_routine(sock, ident, magic)
    except KeyboardInterrupt:
        pass

    # tell sender thread to stop
    sender_stop.append(True)

    # clean sender thread
    sender.join()

    print()

if __name__ == '__main__':
    ping(sys.argv[1])

下一步

本节以 C 语言为例,演示了 ICMP 编程方法。 如果你对其他语言感兴趣,请按需取用:

订阅更新,获取更多学习资料,请关注我们的 微信公众号

../_images/wechat-mp-qrcode.png

小菜学编程