经过Windows/Linux 均测试通过:
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* NTP服务器实现类
* 实现了RFC 5905定义的NTP协议服务器端功能
*/
public class NtpServer {
// NTP相关常量定义
private static final class NTP {
// NTP时间从1900年开始,系统时间从1970年开始,计算两者差值(秒)
// 计算1900年到1970年之间的秒数差(70年*365天+闰年天数)*86400秒
private static final long NTP_DELTA = (LocalDate.of(1970, 1, 1).toEpochDay() -
LocalDate.of(1900, 1, 1).toEpochDay()) * 86400L;
// NTP数据包固定大小为48字节
private static final int PACKET_SIZE = 48;
}
/**
* NTP数据包类
* 用于封装NTP协议数据包的结构和转换方法
*/
private static class NtpPacket {
// NTP数据包字段定义
private byte leap; // 闰秒指示器(2位): 0-无警告,1-最后1分钟有61秒,2-最后1分钟有59秒,3-时钟不同步
private byte version; // 版本号(3位): NTP协议版本,通常为3或4
private byte mode; // 模式(3位): 0-保留,1-对称主动,2-对称被动,3-客户端,4-服务器,5-广播,6-NTP控制消息,7-保留
private byte stratum; // 层级(8位): 0-未指定,1-主参考时钟,2-15-从参考时钟,16-255-保留
private byte poll; // 轮询间隔(8位): 连续报文之间的最大间隔,以秒为单位的2的幂
private byte precision; // 精度(8位): 系统时钟的精度,以秒为单位的2的幂
private int rootDelay; // 根延迟(32位): 到主参考时钟的往返延迟,以秒为单位的小数点后16位定点数
private int rootDispersion; // 根离散(32位): 相对于主参考时钟的最大误差,以秒为单位的小数点后16位定点数
private int refId; // 参考时钟标识符(32位): 标识特定参考源
private long refTimestamp; // 参考时间戳(64位): 系统时钟最后一次被设置或校正的时间
private long origTimestamp; // 原始时间戳(64位): 客户端发送请求的时间
private long recvTimestamp; // 接收时间戳(64位): 服务器收到请求的时间
private long txTimestamp; // 发送时间戳(64位): 服务器发送响应的时间
/**
* 将系统时间(1970年基准)转换为NTP时间(1900年基准,64位格式)
* @param timestamp 系统时间戳(秒)
* @return NTP格式的64位时间戳
*/
private static long systemToNtpTime(long timestamp) {
// 高32位: 1900年以来的秒数
// 低32位: 秒的小数部分,使用纳秒精度计算
return ((timestamp + NTP.NTP_DELTA) << 32) |
((System.nanoTime() % 1_000_000_000) * 0x100000000L / 1_000_000_000);
}
/**
* 从字节数组解析NTP数据包
* @param data 包含NTP数据包的字节数组
*/
public void fromData(byte[] data) {
// 使用大端序(网络字节序)解析数据
ByteBuffer buffer = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN);
// 解析第一个字节(包含leap,version,mode)
byte firstByte = buffer.get();
this.leap = (byte) ((firstByte >> 6) & 0x3); // 取高2位
this.version = (byte) ((firstByte >> 3) & 0x7); // 取中间3位
this.mode = (byte) (firstByte & 0x7); // 取低3位
// 解析其他字段
this.stratum = buffer.get(); // 层级
this.poll = buffer.get(); // 轮询间隔
this.precision = buffer.get(); // 精度
this.rootDelay = buffer.getInt(); // 根延迟
this.rootDispersion = buffer.getInt(); // 根离散
this.refId = buffer.getInt(); // 参考时钟标识符
// 解析时间戳字段
this.refTimestamp = buffer.getLong(); // 参考时间戳
this.origTimestamp = buffer.getLong(); // 原始时间戳
this.recvTimestamp = buffer.getLong(); // 接收时间戳
this.txTimestamp = buffer.getLong(); // 发送时间戳
}
/**
* 将NTP数据包转换为字节数组
* @return 包含NTP数据包的字节数组
*/
public byte[] toData() {
// 分配48字节缓冲区,使用大端序
ByteBuffer buffer = ByteBuffer.allocate(NTP.PACKET_SIZE).order(ByteOrder.BIG_ENDIAN);
// 构造第一个字节(leap,version,mode)
byte firstByte = (byte) ((leap << 6) | (version << 3) | mode);
buffer.put(firstByte);
// 写入其他字段
buffer.put(stratum); // 层级
buffer.put(poll); // 轮询间隔
buffer.put(precision); // 精度
buffer.putInt(rootDelay); // 根延迟
buffer.putInt(rootDispersion); // 根离散
buffer.putInt(refId); // 参考时钟标识符
// 写入时间戳字段
buffer.putLong(refTimestamp); // 参考时间戳
buffer.putLong(origTimestamp); // 原始时间戳
buffer.putLong(recvTimestamp); // 接收时间戳
buffer.putLong(txTimestamp); // 发送时间戳
return buffer.array();
}
}
/**
* 接收线程类
* 负责接收客户端发送的NTP请求数据包
*/
private static class RecvThread extends Thread {
private final DatagramSocket socket; // UDP套接字
private final BlockingQueue<Object[]> taskQueue; // 任务队列(用于与工作线程通信)
private final AtomicBoolean stopFlag; // 停止标志
public RecvThread(DatagramSocket socket, BlockingQueue<Object[]> taskQueue, AtomicBoolean stopFlag) {
this.socket = socket;
this.taskQueue = taskQueue;
this.stopFlag = stopFlag;
}
@Override
public void run() {
try {
while (!stopFlag.get()) {
byte[] buffer = new byte[NTP.PACKET_SIZE]; // NTP数据包固定48字节
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 设置接收超时1秒,避免无限等待
socket.setSoTimeout(1000);
try {
// 接收UDP数据包
socket.receive(packet);
System.out.println("Received packet from: " + packet.getAddress().getHostAddress());
// 记录接收时间戳(NTP格式)
long recvTimestamp = NtpPacket.systemToNtpTime(Instant.now().getEpochSecond());
// 将接收到的数据和客户端信息放入任务队列
// 数组元素: [数据包数据, 客户端地址, 客户端端口, 接收时间戳]
taskQueue.put(new Object[]{
packet.getData(),
packet.getAddress(),
packet.getPort(),
recvTimestamp
});
} catch (SocketTimeoutException e) {
// 超时是正常现象,继续循环检查停止标志
}
}
} catch (IOException | InterruptedException e) {
// 如果不是因为停止标志导致的异常,打印错误
if (!stopFlag.get()) {
e.printStackTrace();
}
}
System.out.println("RecvThread Ended");
}
}
/**
* 工作线程类
* 负责处理接收到的NTP请求并发送响应
*/
private static class WorkThread extends Thread {
private final DatagramSocket socket; // UDP套接字
private final BlockingQueue<Object[]> taskQueue; // 任务队列
private final AtomicBoolean stopFlag; // 停止标志
public WorkThread(DatagramSocket socket, BlockingQueue<Object[]> taskQueue, AtomicBoolean stopFlag) {
this.socket = socket;
this.taskQueue = taskQueue;
this.stopFlag = stopFlag;
}
@Override
public void run() {
try {
while (!stopFlag.get()) {
try {
// 从任务队列获取请求,等待最多1秒
Object[] task = taskQueue.poll(1, TimeUnit.SECONDS);
if (task == null) continue; // 超时或队列为空,继续循环
// 解析任务数据
byte[] data = (byte[]) task[0]; // NTP请求数据
InetAddress address = (InetAddress) task[1]; // 客户端地址
int port = (int) task[2]; // 客户端端口
long recvTimestamp = (long) task[3]; // 接收时间戳
// 解析接收到的NTP数据包
NtpPacket recvPacket = new NtpPacket();
recvPacket.fromData(data);
// 创建响应数据包
NtpPacket sendPacket = new NtpPacket();
sendPacket.version = 3; // NTP版本3
sendPacket.mode = 4; // 服务器模式
sendPacket.stratum = 2; // 二级服务器(假设我们是从一级服务器同步的)
sendPacket.poll = 10; // 轮询间隔(2^10=1024秒)
// 设置时间戳字段
sendPacket.refTimestamp = NtpPacket.systemToNtpTime(Instant.now().getEpochSecond() - 5); // 参考时间(5秒前)
sendPacket.origTimestamp = recvPacket.txTimestamp; // 原始时间戳=客户端发送时间戳
sendPacket.recvTimestamp = recvTimestamp; // 接收时间戳=服务器接收时间
sendPacket.txTimestamp = NtpPacket.systemToNtpTime(Instant.now().getEpochSecond()); // 发送时间=当前时间
// 发送响应
byte[] responseData = sendPacket.toData();
DatagramPacket response = new DatagramPacket(responseData, responseData.length, address, port);
socket.send(response);
System.out.println("Sent to " + address.getHostAddress() + ":" + port);
} catch (InterruptedException e) {
// 如果不是因为停止标志导致的异常,打印错误
if (!stopFlag.get()) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
// 如果不是因为停止标志导致的异常,打印错误
if (!stopFlag.get()) {
e.printStackTrace();
}
}
System.out.println("WorkThread Ended");
}
}
/**
* 主方法
* 启动NTP服务器
* @param args 命令行参数
*/
public static void main(String[] args) {
String listenIp = "0.0.0.0"; // 监听所有网络接口
int listenPort = 123; // NTP标准端口
AtomicBoolean stopFlag = new AtomicBoolean(false); // 停止标志
BlockingQueue<Object[]> taskQueue = new LinkedBlockingQueue<>(); // 任务队列
try (DatagramSocket socket = new DatagramSocket(listenPort, InetAddress.getByName(listenIp))) {
System.out.println("Local socket: " + socket.getLocalSocketAddress());
// 创建并启动接收线程和工作线程
RecvThread recvThread = new RecvThread(socket, taskQueue, stopFlag);
WorkThread workThread = new WorkThread(socket, taskQueue, stopFlag);
recvThread.start();
workThread.start();
// 主线程循环检查中断信号
while (!stopFlag.get()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// 收到中断信号,设置停止标志
stopFlag.set(true);
System.out.println("Exiting...");
// 等待线程结束
recvThread.join();
workThread.join();
System.out.println("Exited");
break;
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}