登峰造极境

  • WIN
    • CSharp
    • JAVA
    • OAM
    • DirectX
    • Emgucv
  • UNIX
    • FFmpeg
    • QT
    • Python
    • Opencv
    • Openwrt
    • Twisted
    • Design Patterns
    • Mysql
    • Mycat
    • MariaDB
    • Make
    • OAM
    • Supervisor
    • Nginx
    • KVM
    • Docker
    • OpenStack
  • WEB
    • ASP
    • Node.js
    • PHP
    • Directadmin
    • Openssl
    • Regex
  • APP
    • Android
  • AI
    • Algorithm
    • Deep Learning
    • Machine Learning
  • IOT
    • Device
    • MSP430
  • DIY
    • Algorithm
    • Design Patterns
    • MATH
    • X98 AIR 3G
    • Tucao
    • fun
  • LIFE
    • 美食
    • 关于我
  • LINKS
  • ME
Claves
长风破浪会有时,直挂云帆济沧海
  1. 首页
  2. Platforms
  3. WINDOWS
  4. JAVA
  5. 正文

java21实现标准的NTP Server 时间授时服务器

2025-05-22

经过Windows/Linux 均测试通过:

import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
        * NTP服务器实现类
 * 实现了RFC 5905定义的NTP协议服务器端功能
 */
public class NtpServer {
    // NTP相关常量定义
    private static final class NTP {
        // NTP时间从1900年开始,系统时间从1970年开始,计算两者差值(秒)
        // 计算1900年到1970年之间的秒数差(70年*365天+闰年天数)*86400秒
        private static final long NTP_DELTA = (LocalDate.of(1970, 1, 1).toEpochDay() -
                LocalDate.of(1900, 1, 1).toEpochDay()) * 86400L;

        // NTP数据包固定大小为48字节
        private static final int PACKET_SIZE = 48;
    }

    /**
            * NTP数据包类
     * 用于封装NTP协议数据包的结构和转换方法
     */
    private static class NtpPacket {
        // NTP数据包字段定义
        private byte leap;          // 闰秒指示器(2位): 0-无警告,1-最后1分钟有61秒,2-最后1分钟有59秒,3-时钟不同步
        private byte version;       // 版本号(3位): NTP协议版本,通常为3或4
        private byte mode;          // 模式(3位): 0-保留,1-对称主动,2-对称被动,3-客户端,4-服务器,5-广播,6-NTP控制消息,7-保留
        private byte stratum;       // 层级(8位): 0-未指定,1-主参考时钟,2-15-从参考时钟,16-255-保留
        private byte poll;          // 轮询间隔(8位): 连续报文之间的最大间隔,以秒为单位的2的幂
        private byte precision;     // 精度(8位): 系统时钟的精度,以秒为单位的2的幂
        private int rootDelay;      // 根延迟(32位): 到主参考时钟的往返延迟,以秒为单位的小数点后16位定点数
        private int rootDispersion; // 根离散(32位): 相对于主参考时钟的最大误差,以秒为单位的小数点后16位定点数
        private int refId;         // 参考时钟标识符(32位): 标识特定参考源
        private long refTimestamp;  // 参考时间戳(64位): 系统时钟最后一次被设置或校正的时间
        private long origTimestamp; // 原始时间戳(64位): 客户端发送请求的时间
        private long recvTimestamp; // 接收时间戳(64位): 服务器收到请求的时间
        private long txTimestamp;   // 发送时间戳(64位): 服务器发送响应的时间

        /**
                * 将系统时间(1970年基准)转换为NTP时间(1900年基准,64位格式)
                * @param timestamp 系统时间戳(秒)
         * @return NTP格式的64位时间戳
         */
        private static long systemToNtpTime(long timestamp) {
            // 高32位: 1900年以来的秒数
            // 低32位: 秒的小数部分,使用纳秒精度计算
            return ((timestamp + NTP.NTP_DELTA) << 32) |
                    ((System.nanoTime() % 1_000_000_000) * 0x100000000L / 1_000_000_000);
        }

        /**
                * 从字节数组解析NTP数据包
         * @param data 包含NTP数据包的字节数组
         */
        public void fromData(byte[] data) {
            // 使用大端序(网络字节序)解析数据
            ByteBuffer buffer = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN);

            // 解析第一个字节(包含leap,version,mode)
            byte firstByte = buffer.get();
            this.leap = (byte) ((firstByte >> 6) & 0x3);  // 取高2位
            this.version = (byte) ((firstByte >> 3) & 0x7); // 取中间3位
            this.mode = (byte) (firstByte & 0x7);          // 取低3位

            // 解析其他字段
            this.stratum = buffer.get();       // 层级
            this.poll = buffer.get();          // 轮询间隔
            this.precision = buffer.get();     // 精度

            this.rootDelay = buffer.getInt();      // 根延迟
            this.rootDispersion = buffer.getInt(); // 根离散
            this.refId = buffer.getInt();         // 参考时钟标识符

            // 解析时间戳字段
            this.refTimestamp = buffer.getLong();  // 参考时间戳
            this.origTimestamp = buffer.getLong(); // 原始时间戳
            this.recvTimestamp = buffer.getLong(); // 接收时间戳
            this.txTimestamp = buffer.getLong();   // 发送时间戳
        }

        /**
                * 将NTP数据包转换为字节数组
         * @return 包含NTP数据包的字节数组
         */
        public byte[] toData() {
            // 分配48字节缓冲区,使用大端序
            ByteBuffer buffer = ByteBuffer.allocate(NTP.PACKET_SIZE).order(ByteOrder.BIG_ENDIAN);

            // 构造第一个字节(leap,version,mode)
            byte firstByte = (byte) ((leap << 6) | (version << 3) | mode);
            buffer.put(firstByte);

            // 写入其他字段
            buffer.put(stratum);      // 层级
            buffer.put(poll);         // 轮询间隔
            buffer.put(precision);    // 精度

            buffer.putInt(rootDelay);      // 根延迟
            buffer.putInt(rootDispersion); // 根离散
            buffer.putInt(refId);         // 参考时钟标识符

            // 写入时间戳字段
            buffer.putLong(refTimestamp);  // 参考时间戳
            buffer.putLong(origTimestamp); // 原始时间戳
            buffer.putLong(recvTimestamp); // 接收时间戳
            buffer.putLong(txTimestamp);   // 发送时间戳

            return buffer.array();
        }
    }

    /**
            * 接收线程类
     * 负责接收客户端发送的NTP请求数据包
     */
    private static class RecvThread extends Thread {
        private final DatagramSocket socket;       // UDP套接字
        private final BlockingQueue<Object[]> taskQueue; // 任务队列(用于与工作线程通信)
        private final AtomicBoolean stopFlag;      // 停止标志

        public RecvThread(DatagramSocket socket, BlockingQueue<Object[]> taskQueue, AtomicBoolean stopFlag) {
            this.socket = socket;
            this.taskQueue = taskQueue;
            this.stopFlag = stopFlag;
        }

        @Override
        public void run() {
            try {
                while (!stopFlag.get()) {
                    byte[] buffer = new byte[NTP.PACKET_SIZE];  // NTP数据包固定48字节
                    DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                    // 设置接收超时1秒,避免无限等待
                    socket.setSoTimeout(1000);

                    try {
                        // 接收UDP数据包
                        socket.receive(packet);
                        System.out.println("Received packet from: " + packet.getAddress().getHostAddress());

                        // 记录接收时间戳(NTP格式)
                        long recvTimestamp = NtpPacket.systemToNtpTime(Instant.now().getEpochSecond());

                        // 将接收到的数据和客户端信息放入任务队列
                        // 数组元素: [数据包数据, 客户端地址, 客户端端口, 接收时间戳]
                        taskQueue.put(new Object[]{
                                packet.getData(),
                                packet.getAddress(),
                                packet.getPort(),
                                recvTimestamp
                        });
                    } catch (SocketTimeoutException e) {
                        // 超时是正常现象,继续循环检查停止标志
                    }
                }
            } catch (IOException | InterruptedException e) {
                // 如果不是因为停止标志导致的异常,打印错误
                if (!stopFlag.get()) {
                    e.printStackTrace();
                }
            }
            System.out.println("RecvThread Ended");
        }
    }

    /**
            * 工作线程类
     * 负责处理接收到的NTP请求并发送响应
     */
    private static class WorkThread extends Thread {
        private final DatagramSocket socket;       // UDP套接字
        private final BlockingQueue<Object[]> taskQueue; // 任务队列
        private final AtomicBoolean stopFlag;      // 停止标志

        public WorkThread(DatagramSocket socket, BlockingQueue<Object[]> taskQueue, AtomicBoolean stopFlag) {
            this.socket = socket;
            this.taskQueue = taskQueue;
            this.stopFlag = stopFlag;
        }

        @Override
        public void run() {
            try {
                while (!stopFlag.get()) {
                    try {
                        // 从任务队列获取请求,等待最多1秒
                        Object[] task = taskQueue.poll(1, TimeUnit.SECONDS);
                        if (task == null) continue;  // 超时或队列为空,继续循环

                        // 解析任务数据
                        byte[] data = (byte[]) task[0];          // NTP请求数据
                        InetAddress address = (InetAddress) task[1]; // 客户端地址
                        int port = (int) task[2];                // 客户端端口
                        long recvTimestamp = (long) task[3];     // 接收时间戳

                        // 解析接收到的NTP数据包
                        NtpPacket recvPacket = new NtpPacket();
                        recvPacket.fromData(data);

                        // 创建响应数据包
                        NtpPacket sendPacket = new NtpPacket();
                        sendPacket.version = 3;  // NTP版本3
                        sendPacket.mode = 4;     // 服务器模式
                        sendPacket.stratum = 2;  // 二级服务器(假设我们是从一级服务器同步的)
                        sendPacket.poll = 10;    // 轮询间隔(2^10=1024秒)

                        // 设置时间戳字段
                        sendPacket.refTimestamp = NtpPacket.systemToNtpTime(Instant.now().getEpochSecond() - 5); // 参考时间(5秒前)
                        sendPacket.origTimestamp = recvPacket.txTimestamp; // 原始时间戳=客户端发送时间戳
                        sendPacket.recvTimestamp = recvTimestamp;         // 接收时间戳=服务器接收时间
                        sendPacket.txTimestamp = NtpPacket.systemToNtpTime(Instant.now().getEpochSecond()); // 发送时间=当前时间

                        // 发送响应
                        byte[] responseData = sendPacket.toData();
                        DatagramPacket response = new DatagramPacket(responseData, responseData.length, address, port);
                        socket.send(response);

                        System.out.println("Sent to " + address.getHostAddress() + ":" + port);
                    } catch (InterruptedException e) {
                        // 如果不是因为停止标志导致的异常,打印错误
                        if (!stopFlag.get()) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (IOException e) {
                // 如果不是因为停止标志导致的异常,打印错误
                if (!stopFlag.get()) {
                    e.printStackTrace();
                }
            }
            System.out.println("WorkThread Ended");
        }
    }

    /**
            * 主方法
     * 启动NTP服务器
     * @param args 命令行参数
     */
    public static void main(String[] args) {
        String listenIp = "0.0.0.0";  // 监听所有网络接口
        int listenPort = 123;         // NTP标准端口
        AtomicBoolean stopFlag = new AtomicBoolean(false); // 停止标志
        BlockingQueue<Object[]> taskQueue = new LinkedBlockingQueue<>(); // 任务队列

        try (DatagramSocket socket = new DatagramSocket(listenPort, InetAddress.getByName(listenIp))) {
            System.out.println("Local socket: " + socket.getLocalSocketAddress());

            // 创建并启动接收线程和工作线程
            RecvThread recvThread = new RecvThread(socket, taskQueue, stopFlag);
            WorkThread workThread = new WorkThread(socket, taskQueue, stopFlag);

            recvThread.start();
            workThread.start();

            // 主线程循环检查中断信号
            while (!stopFlag.get()) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // 收到中断信号,设置停止标志
                    stopFlag.set(true);
                    System.out.println("Exiting...");

                    // 等待线程结束
                    recvThread.join();
                    workThread.join();

                    System.out.println("Exited");
                    break;
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
标签: 暂无
最后更新:2025-05-22

代号山岳

知之为知之 不知为不知

点赞
< 上一篇

COPYRIGHT © 2099 登峰造极境. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

蜀ICP备14031139号-5

川公网安备51012202000587号