JAVA NIO简单实现Socket Server

news/2024/7/3 8:44:29

为什么80%的码农都做不了架构师?>>>   hot3.png

使用JAVA NIO简单实现Socket Server

package com.flyer.cn.javaIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class EchoServer {
	public static SelectorLoop connectionBell;
	public static SelectorLoop readBell;
	public boolean isReadBellRunning=false;
	private ExecutorService thdPool=Executors.newCachedThreadPool();

	public static void main(String[] args) throws IOException {
		new EchoServer().startServer();
	}
	
	// 启动服务器
	public void startServer() throws IOException {
		// 准备好一个闹钟.当有链接进来的时候响.
		connectionBell = new SelectorLoop();
		
		// 准备好一个闹装,当有read事件进来的时候响.
		readBell = new SelectorLoop();
		
		// 开启一个server channel来监听
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 开启非阻塞模式
		ssc.configureBlocking(false);
		
		ServerSocket socket = ssc.socket();
		socket.bind(new InetSocketAddress("localhost",7878));
		
		// 给闹钟规定好要监听报告的事件,这个闹钟只监听新连接事件.
		ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
		new Thread(connectionBell,"connectionBell").start();
	}
	
	// Selector轮询线程类
	public class SelectorLoop implements Runnable {
		private Selector selector;
		private ByteBuffer temp = ByteBuffer.allocate(1024);
		
		public SelectorLoop() throws IOException {
			this.selector = Selector.open();
		}
		
		public Selector getSelector() {
			return this.selector;
		}

		@Override
		public void run() {
			while(true) {
				try {
				    // 阻塞,只有当至少一个注册的事件发生的时候才会继续.
					this.selector.select();
					
					Set<SelectionKey> selectKeys = this.selector.selectedKeys();
					Iterator<SelectionKey> it = selectKeys.iterator();
					while (it.hasNext()) {
						SelectionKey key = it.next();
						it.remove();
						if (key.isAcceptable()) {
							// 这是一个connection accept事件, 并且这个事件是注册在serversocketchannel上的.
							ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
							// 接受一个连接.
							SocketChannel sc = ssc.accept();
							
							// 对新的连接的channel注册read事件. 使用readBell闹钟.
							sc.configureBlocking(false);
							sc.register(readBell.getSelector(), SelectionKey.OP_READ);
							System.out.println(" from client address:" + sc.getRemoteAddress());
							// 如果读取线程还没有启动,那就启动一个读取线程.
							synchronized(EchoServer.this) {
								if (!EchoServer.this.isReadBellRunning) {
									EchoServer.this.isReadBellRunning = true;
									new Thread(readBell,"readBell").start();
								}
							}
							
						} 
						else if (key.isReadable()){
							int IntLength=4;
					        int ObjLength; //有效数据长度
					        int readObj;//从NIO信道中读出的数据长度
					        ByteBuffer bbInt = ByteBuffer.allocate(4);    //读取INT头信息的缓存池
					        ByteBuffer bbObj = ByteBuffer.allocate(1024);     //读取OBJ有效数据的缓存池
							// 这是一个read事件,并且这个事件是注册在socketchannel上的.
							SocketChannel channel = (SocketChannel) key.channel();
							
					          //读出INT数据头
				           channel.read(bbInt);
				                //获取INT头中标示的有效数据长度信息并清空INT缓存池
				                ObjLength = bbInt.getInt(0);
				                bbInt.clear();

				                //清空有效数据缓存池设置有效缓存池的大小
				                bbObj.clear();
				                bbObj.limit(ObjLength);

				                //循环读满缓存池以保证数据完整性
				                readObj = channel.read(bbObj);
				                while (readObj != ObjLength) {
				                    readObj += channel.read(bbObj);
				                }
							
//							
//							// 写数据到buffer
//							int count = sc.read(temp);
//							if (count < 0) {
//								// 客户端已经断开连接.
//								key.cancel();
//								sc.close();
//								return;
//							}
//							// 切换buffer到读状态,内部指针归位.
							bbObj.flip();
							String msg = Charset.forName("UTF-8").decode(bbObj).toString();
							
//							System.out.println(ObjLength+":"+readObj+"  "+new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + channel.getRemoteAddress());
							if(ObjLength!=readObj){
								System.out.println(ObjLength+":"+readObj+"  "+new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + channel.getRemoteAddress());
							}
							// 清空buffer
							temp.clear();
					
						thdPool.submit(new Dispatch(channel,msg));
						}
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
		
		public class Dispatch implements Runnable{
			private SocketChannel sc;
			private String msg;
			public Dispatch(SocketChannel _sc,String _msg){
				this.sc=_sc;
				this.msg=_msg;
			}

			public void run() {
				try{
//					Thread.sleep(500);
					msg=msg+"   "+new Date().toLocaleString();
					// echo back.
					sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
				}
			catch(Exception ex){
				ex.printStackTrace();
			}
			}
		}
		
		
	}

}

客户端

package com.flyer.cn.javaIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class Client implements Runnable {
	// 空闲计数器,如果空闲超过10次,将检测server是否中断连接.
	private String clientName;
	private static int idleCounter = 0;
	private Selector selector;
	private SocketChannel socketChannel;
	private ByteBuffer temp = ByteBuffer.allocate(1024);

	public static void main(String[] args) throws IOException {
		for(int i=0;i<100;i++){
			Client client= new Client("client"+i);
			new Thread(client).start();
			//client.sendFirstMsg();
		}
	}
	
	public Client(String name)   {
		try{
		this.clientName=name;
		// 同样的,注册闹钟.
		this.selector = Selector.open();
		
		// 连接远程server
		socketChannel = SocketChannel.open();
		// 如果快速的建立了连接,返回true.如果没有建立,则返回false,并在连接后出发Connect事件.
		Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878));
		socketChannel.configureBlocking(false);
		SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
		
		if (isConnected) {
			this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName);
		} else {
			// 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件.
		    key.interestOps(SelectionKey.OP_CONNECT);
		}
		}
		catch(Exception ex){
			ex.printStackTrace();
		}
	}
	
	public void sendFirstMsg(SocketChannel socketChannel,String msg) throws IOException {
		int IntLength=4;
		ByteBuffer bb = ByteBuffer.allocate(1024);
        //构造发送数据:整型数据头+有效数据段
		byte[] arr = msg.getBytes(Charset.forName("UTF-8"));
        final int ObjLength = arr.length;   //获取有效数据段长度  
        bb.clear();
        bb.limit(IntLength + ObjLength);    //调整缓存池大小
        bb.putInt(ObjLength);
        bb.put(arr);
        bb.position(0);        
		socketChannel.write(bb);
	}

	@Override
	public void run() {
        while (true) {
			try {
				// 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量.
				int num = this.selector.select(1000);
				if (num ==0) {
					idleCounter ++;
					if(idleCounter >10) {
						// 如果server断开了连接,发送消息将失败.
						try {
						    this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName);
						} catch(ClosedChannelException e) {
							e.printStackTrace();
							this.socketChannel.close();
							return;
						}
					}
					continue;
				} else {
					idleCounter = 0;
				}
				Set<SelectionKey> keys = this.selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();
					if (key.isConnectable()) {
						// socket connected
						SocketChannel sc = (SocketChannel)key.channel();
						if (sc.isConnectionPending()) {
						    sc.finishConnect();
						}
						// send first message;
						this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName);
					}
					if (key.isReadable()) {
						// msg received.
						SocketChannel sc = (SocketChannel)key.channel();
						this.temp = ByteBuffer.allocate(1024);
						int count = sc.read(temp);
						if (count<0) {
							sc.close();
							continue;
						}
						// 切换buffer到读状态,内部指针归位.
						temp.flip();
						String msg = Charset.forName("UTF-8").decode(temp).toString();
						System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress()+new Date().toLocaleString());
						
						Thread.sleep(1000);
						sendFirstMsg(sc,"Hello NIO.From "+this.clientName);
						// echo back.
//						sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
						
						// 清空buffer
						temp.clear();
					}
				}
			} catch (Exception e) {
				System.out.println("网络连接异常");
			}
		}
	}

}

转载于:https://my.oschina.net/u/1774673/blog/808674


http://www.niftyadmin.cn/n/2436249.html

相关文章

LeetCode-448. Find All Numbers Disappeared in an Array C#

Given an array of integers where 1 ≤ a[i] ≤ n (n size of array), some elements appear twice and others appear once. Find all the elements of [1, n] inclusive that do not appear in this array. Could you do it without extra space and in O(n) runtime? You…

MyBatisPlus 入门学习笔记(版本:3.4.3)

文章目录学习辅助资料MyBatisPlus概述1. MyBatisPlus是什么2. 特性快速开始1. 创建数据库 mybatis_plus2. 导入相关依赖3. 数据源配置3. 快速开始3.1 User实体类编写3.2 mapper编写3.3 启动类设置3.4 测试配置日志Mapper层自带的的CRUD方法1. Insert插入操作1.1 产生奇妙ID的原…

利用反射机制获取属性的值遇到的坑

类&#xff1a; public Class Test { public string name; public string value; } Test tnew Test(); t.name"abc"; t.value"123"; string str(string)t.GetType().GetProperty("name").GetValue(t,null); 找了3个小时&#xff0c;都找不出问题…

IDEA常用快捷键和Live Templates for Mac

1. IDEA常用快捷键 CmdShiftEnter&#xff1a;将输入的if&#xff0c;for&#xff0c;函数等等补上{}或者&#xff1b;使代码语句完整ShiftEnter&#xff1a;在当前行的下方开始新行OptEnter: 正则表达式验证CmdOptEnter&#xff1a;在当前行的上方插入新行OptEnter: 代码快速…

Qt 查找功能

版权声明该文章原创于Qter开源社区&#xff08;www.qter.org&#xff09;&#xff0c;作者yafeilinux&#xff0c;转载请注明出处&#xff01;导语这一篇我们来加上查找菜单的功能。因为要涉及Qt Creator的很多实用功能&#xff0c;所以单独用一篇文章来介绍。以前都用设计器设…

Git入门笔记

文章目录0. Git原理简述1. 设置用户签名 git config2. 初始化本地库 git init3. 查看本地库状态 git status4. 本地文件添加到暂存区 git add5. 暂存区的文件提交到本地库 git commit6. 查看历史版本 git reflog7. 修改文件后提交到本地库8. 版本穿梭9. Git分支9.1 查看分支9.2…

创建帧动画

Photoshop制作会跳动的文字动画效果和流动效果 --之心 新建一个大小适当文档&#xff0c;选择椭圆工具&#xff0c;按住Shift拉出一个正圆&#xff0c;然后锁定图层。选择渐变工具&#xff0c;将前景色与背景色分别设置为白色和任意深色。如下图中直线方向从上至下拖曳。得到小…

macOS IDEA等jetbrain全家桶,Clear Read-Only Status解决方法

问题 在编写代码的时候&#xff0c;代码因为只读而不让修改&#xff0c;并且跳出clear read-only status窗口。 主要原因在于&#xff0c;现在的macOS的权限设置导致编辑器没有权限修改代码。 解决办法 放开文件的写权限即可。 终端cd到项目的根目录后执行以下代码&#x…