登峰造极境

  • WIN
    • CSharp
    • JAVA
    • OAM
    • DirectX
    • Emgucv
  • UNIX
    • FFmpeg
    • QT
    • Python
    • Opencv
    • Openwrt
    • Twisted
    • Design Patterns
    • Mysql
    • Mycat
    • MariaDB
    • Make
    • OAM
    • Supervisor
    • Nginx
    • KVM
    • Docker
    • OpenStack
  • WEB
    • ASP
    • Node.js
    • PHP
    • Directadmin
    • Openssl
    • Regex
  • APP
    • Android
  • AI
    • Algorithm
    • Deep Learning
    • Machine Learning
  • IOT
    • Device
    • MSP430
  • DIY
    • Algorithm
    • Design Patterns
    • MATH
    • X98 AIR 3G
    • Tucao
    • fun
  • LIFE
    • 美食
    • 关于我
  • LINKS
  • ME
Claves
长风破浪会有时,直挂云帆济沧海
  1. 首页
  2. Programming
  3. Python
  4. 正文

python3实现标准NtpServer时间授时服务器

2025-05-22

源仓库

官方仓库是Python2的版本,我改成了Python3的版本,并且Windows/Linux均测试通过。

官方仓库:https://github.com/limifly/ntpserver

代码:

import datetime
import socket
import struct
import time
import queue  # Changed from Queue to queue
import threading
import select

taskQueue = queue.Queue()  # Changed from Queue to queue
stopFlag = False

def system_to_ntp_time(timestamp):
    """Convert a system time to a NTP time.

    Parameters:
    timestamp -- timestamp in system time

    Returns:
    corresponding NTP time
    """
    return timestamp + NTP.NTP_DELTA

def _to_int(timestamp):
    """Return the integral part of a timestamp.

    Parameters:
    timestamp -- NTP timestamp

    Returns:
    integral part
    """
    return int(timestamp)

def _to_frac(timestamp, n=32):
    """Return the fractional part of a timestamp.

    Parameters:
    timestamp -- NTP timestamp
    n         -- number of bits of the fractional part

    Returns:
    fractional part
    """
    return int(abs(timestamp - _to_int(timestamp)) * 2**n)

def _to_time(integ, frac, n=32):
    """Return a timestamp from an integral and fractional part.

    Parameters:
    integ -- integral part
    frac  -- fractional part
    n     -- number of bits of the fractional part

    Returns:
    timestamp
    """
    return integ + float(frac)/2**n

class NTPException(Exception):
    """Exception raised by this module."""
    pass

class NTP:
    """Helper class defining constants."""

    _SYSTEM_EPOCH = datetime.date(*time.gmtime(0)[0:3])
    """system epoch"""
    _NTP_EPOCH = datetime.date(1900, 1, 1)
    """NTP epoch"""
    NTP_DELTA = (_SYSTEM_EPOCH - _NTP_EPOCH).days * 24 * 3600
    """delta between system and NTP time"""

    REF_ID_TABLE = {
            'DNC': "DNC routing protocol",
            'NIST': "NIST public modem",
            'TSP': "TSP time protocol",
            'DTS': "Digital Time Service",
            'ATOM': "Atomic clock (calibrated)",
            'VLF': "VLF radio (OMEGA, etc)",
            'callsign': "Generic radio",
            'LORC': "LORAN-C radionavidation",
            'GOES': "GOES UHF environment satellite",
            'GPS': "GPS UHF satellite positioning",
    }
    """reference identifier table"""

    STRATUM_TABLE = {
        0: "unspecified",
        1: "primary reference",
    }
    """stratum table"""

    MODE_TABLE = {
        0: "unspecified",
        1: "symmetric active",
        2: "symmetric passive",
        3: "client",
        4: "server",
        5: "broadcast",
        6: "reserved for NTP control messages",
        7: "reserved for private use",
    }
    """mode table"""

    LEAP_TABLE = {
        0: "no warning",
        1: "last minute has 61 seconds",
        2: "last minute has 59 seconds",
        3: "alarm condition (clock not synchronized)",
    }
    """leap indicator table"""

class NTPPacket:
    """NTP packet class.

    This represents an NTP packet.
    """

    _PACKET_FORMAT = "!B B B b 11I"
    """packet format to pack/unpack"""

    def __init__(self, version=2, mode=3, tx_timestamp=0):
        """Constructor.

        Parameters:
        version      -- NTP version
        mode         -- packet mode (client, server)
        tx_timestamp -- packet transmit timestamp
        """
        self.leap = 0
        """leap second indicator"""
        self.version = version
        """version"""
        self.mode = mode
        """mode"""
        self.stratum = 0
        """stratum"""
        self.poll = 0
        """poll interval"""
        self.precision = 0
        """precision"""
        self.root_delay = 0
        """root delay"""
        self.root_dispersion = 0
        """root dispersion"""
        self.ref_id = 0
        """reference clock identifier"""
        self.ref_timestamp = 0
        """reference timestamp"""
        self.orig_timestamp = 0
        self.orig_timestamp_high = 0
        self.orig_timestamp_low = 0
        """originate timestamp"""
        self.recv_timestamp = 0
        """receive timestamp"""
        self.tx_timestamp = tx_timestamp
        self.tx_timestamp_high = 0
        self.tx_timestamp_low = 0
        """transmit timestamp"""
        
    def to_data(self):
        """Convert this NTPPacket to a buffer that can be sent over a socket.

        Returns:
        buffer representing this packet

        Raises:
        NTPException -- in case of invalid field
        """
        try:
            packed = struct.pack(NTPPacket._PACKET_FORMAT,
                (self.leap << 6 | self.version << 3 | self.mode),
                self.stratum,
                self.poll,
                self.precision,
                _to_int(self.root_delay) << 16 | _to_frac(self.root_delay, 16),
                _to_int(self.root_dispersion) << 16 |
                _to_frac(self.root_dispersion, 16),
                self.ref_id,
                _to_int(self.ref_timestamp),
                _to_frac(self.ref_timestamp),
                #Change by lichen, avoid loss of precision
                self.orig_timestamp_high,
                self.orig_timestamp_low,
                _to_int(self.recv_timestamp),
                _to_frac(self.recv_timestamp),
                _to_int(self.tx_timestamp),
                _to_frac(self.tx_timestamp))
        except struct.error:
            raise NTPException("Invalid NTP packet fields.")
        return packed

    def from_data(self, data):
        """Populate this instance from a NTP packet payload received from
        the network.

        Parameters:
        data -- buffer payload

        Raises:
        NTPException -- in case of invalid packet format
        """
        try:
            unpacked = struct.unpack(NTPPacket._PACKET_FORMAT,
                    data[0:struct.calcsize(NTPPacket._PACKET_FORMAT)])
        except struct.error:
            raise NTPException("Invalid NTP packet.")

        self.leap = unpacked[0] >> 6 & 0x3
        self.version = unpacked[0] >> 3 & 0x7
        self.mode = unpacked[0] & 0x7
        self.stratum = unpacked[1]
        self.poll = unpacked[2]
        self.precision = unpacked[3]
        self.root_delay = float(unpacked[4])/2**16
        self.root_dispersion = float(unpacked[5])/2**16
        self.ref_id = unpacked[6]
        self.ref_timestamp = _to_time(unpacked[7], unpacked[8])
        self.orig_timestamp = _to_time(unpacked[9], unpacked[10])
        self.orig_timestamp_high = unpacked[9]
        self.orig_timestamp_low = unpacked[10]
        self.recv_timestamp = _to_time(unpacked[11], unpacked[12])
        self.tx_timestamp = _to_time(unpacked[13], unpacked[14])
        self.tx_timestamp_high = unpacked[13]
        self.tx_timestamp_low = unpacked[14]

    def GetTxTimeStamp(self):
        return (self.tx_timestamp_high, self.tx_timestamp_low)

    def SetOriginTimeStamp(self, high, low):
        self.orig_timestamp_high = high
        self.orig_timestamp_low = low

class RecvThread(threading.Thread):
    def __init__(self, socket):  # Changed init to __init__
        threading.Thread.__init__(self)  # Changed init to __init__
        self.socket = socket
        
    def run(self):
        global taskQueue, stopFlag
        while True:
            if stopFlag == True:
                print("RecvThread Ended")  # Changed print to print()
                break
            rlist, wlist, elist = select.select([self.socket], [], [], 1)
            if len(rlist) != 0:
                print("Received %d packets" % len(rlist))  # Changed print to print()
                for tempSocket in rlist:
                    try:
                        data, addr = tempSocket.recvfrom(1024)
                        recvTimestamp = system_to_ntp_time(time.time())
                        taskQueue.put((data, addr, recvTimestamp))
                    except socket.error as msg:  # Changed except syntax
                        print(msg)  # Changed print to print()

class WorkThread(threading.Thread):
    def __init__(self, socket):  # Changed init to __init__
        threading.Thread.__init__(self)  # Changed init to __init__
        self.socket = socket
        
    def run(self):
        global taskQueue, stopFlag
        while True:
            if stopFlag == True:
                print("WorkThread Ended")  # Changed print to print()
                break
            try:
                data, addr, recvTimestamp = taskQueue.get(timeout=1)
                recvPacket = NTPPacket()
                recvPacket.from_data(data)
                timeStamp_high, timeStamp_low = recvPacket.GetTxTimeStamp()
                sendPacket = NTPPacket(version=3, mode=4)
                sendPacket.stratum = 2
                sendPacket.poll = 10
                '''
                sendPacket.precision = 0xfa
                sendPacket.root_delay = 0x0bfa
                sendPacket.root_dispersion = 0x0aa7
                sendPacket.ref_id = 0x808a8c2c
                '''
                sendPacket.ref_timestamp = recvTimestamp-5
                sendPacket.SetOriginTimeStamp(timeStamp_high, timeStamp_low)
                sendPacket.recv_timestamp = recvTimestamp
                sendPacket.tx_timestamp = system_to_ntp_time(time.time())
                self.socket.sendto(sendPacket.to_data(), addr)  # Added self.
                print("Sended to %s:%d" % (addr[0], addr[1]))  # Changed print to print()
            except queue.Empty:  # Changed Queue.Empty to queue.Empty
                continue

listenIp = "0.0.0.0"
listenPort = 123
socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
socket.bind((listenIp, listenPort))
print("local socket: ", socket.getsockname())  # Changed print to print()
recvThread = RecvThread(socket)
recvThread.start()
workThread = WorkThread(socket)
workThread.start()

while True:
    try:
        time.sleep(0.5)
    except KeyboardInterrupt:
        print("Exiting...")  # Changed print to print()
        stopFlag = True
        recvThread.join()
        workThread.join()
        #socket.close()
        print("Exited")  # Changed print to print()
        break
标签: 暂无
最后更新:2025-05-22

代号山岳

知之为知之 不知为不知

点赞
< 上一篇

COPYRIGHT © 2099 登峰造极境. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

蜀ICP备14031139号-5

川公网安备51012202000587号