diff --git a/02nio/nio01/src/main/java/java0/nio01/HttpClinet01.java b/02nio/nio01/src/main/java/java0/nio01/HttpClinet01.java new file mode 100644 index 00000000..1c4c2473 --- /dev/null +++ b/02nio/nio01/src/main/java/java0/nio01/HttpClinet01.java @@ -0,0 +1,29 @@ +package java0.nio01; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +public class HttpClinet01 { + public static void main(String[] args) throws IOException{ + Socket clientSocket = new Socket("127.0.0.1",8801); + client(clientSocket); + } + + private static void client(Socket socket) { + try { + // 获取服务端消息 + InputStream socketData = socket.getInputStream(); + //读取流数据 (socket返回的数据) + byte[] buf = new byte[1024]; + int len = 0; + while ((len = socketData.read(buf)) != -1) { + System.out.println(new String(buf, 0, len)); + } + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } +} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_1/HttpClient.java b/02nio/nio02/src/main/java/io/github/tn/week03_1/HttpClient.java new file mode 100644 index 00000000..6813fa53 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_1/HttpClient.java @@ -0,0 +1,80 @@ +package io.github.tn.week03_1; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * + * 周四作业:整合你上次作业的httpclient/okhttp + * @author tn + * @version 1 + * @ClassName HttpClient + * @description 访问http + * @date 2020/10/27 21:25 + */ +public class HttpClient { + + + //server + public static void main(String[] args) throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(40); + final ServerSocket serverSocket = new ServerSocket(1212); + while (true) { + try { + final Socket socket = serverSocket.accept(); + executorService.execute(() -> service(socket)); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + //service + private static void service(Socket socket) { + try { + PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); + printWriter.println("HTTP/1.1 200 OK"); + printWriter.println("Content-Type:text/html;charset=utf-8"); + String body = requset(); + printWriter.println("Content-Length:" + body.getBytes().length); + printWriter.println(); + printWriter.write(body); + printWriter.close(); + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + // invoking + private static String requset(){ + HttpGet httpGet = new HttpGet("http://localhost:8801/test"); + try(CloseableHttpClient httpClient = HttpClients.createDefault(); + CloseableHttpResponse resp = httpClient.execute(httpGet) ) { + if(resp.getStatusLine().getStatusCode()==200){ + HttpEntity body = resp.getEntity(); + //使用工具类EntityUtils,从响应中取出实体表示的内容并转换成字符串 + String data = EntityUtils.toString(body, "utf-8"); + return data; + } + }catch (Exception e){ + System.err.println("接口调用失败"); + } + return "接口调用失败"; + } + + + +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_2/TNettyServerApplication.java b/02nio/nio02/src/main/java/io/github/tn/week03_2/TNettyServerApplication.java new file mode 100644 index 00000000..8bfd414e --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_2/TNettyServerApplication.java @@ -0,0 +1,34 @@ +package io.github.tn.week03_2; + + +import io.github.tn.week03_2.inbound.HttpInboundServer; + +/** + * gateway 启动类 + */ +public class TNettyServerApplication { + + //项目名 + public final static String GATEWAY_NAME = "NIOGateway"; + //版本 + public final static String GATEWAY_VERSION = "1.0.0"; + + public static void main(String[] args) { + String proxyServer = System.getProperty("proxyServer","http://localhost:8801/test"); + String proxyPort = System.getProperty("proxyPort","8888"); + + // http://localhost:8888/api/hello ==> gateway API + // http://localhost:8088/api/hello ==> backend service + + int port = Integer.parseInt(proxyPort); + System.out.println(GATEWAY_NAME + " " + GATEWAY_VERSION +" starting..."); + // netty 启动 + HttpInboundServer server = new HttpInboundServer(port, proxyServer); + System.out.println(GATEWAY_NAME + " " + GATEWAY_VERSION +" started at http://localhost:" + port + " for server:" + proxyServer); + try { + server.run(); + }catch (Exception ex){ + ex.printStackTrace(); + } + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundHandler.java new file mode 100644 index 00000000..a99b0bd1 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundHandler.java @@ -0,0 +1,41 @@ +package io.github.tn.week03_2.inbound; + +import io.github.tn.week03_2.outbound.HttpOutboundHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.util.ReferenceCountUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpInboundHandler extends ChannelInboundHandlerAdapter { + + private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class); + private final String proxyServer; + private HttpOutboundHandler handler; + + public HttpInboundHandler(String proxyServer) { + this.proxyServer = proxyServer; + handler = new HttpOutboundHandler(this.proxyServer); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + //logger.info("channelRead流量接口请求开始,时间为{}", startTime); + FullHttpRequest fullRequest = (FullHttpRequest) msg; + handler.requset(fullRequest, ctx); + } catch(Exception e) { + e.printStackTrace(); + } finally { + ReferenceCountUtil.release(msg); + } + } + + +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundInitializer.java b/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundInitializer.java new file mode 100644 index 00000000..dff3a40b --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundInitializer.java @@ -0,0 +1,24 @@ +package io.github.tn.week03_2.inbound; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; + +public class HttpInboundInitializer extends ChannelInitializer { + + private String proxyServer; + + public HttpInboundInitializer(String proxyServer) { + this.proxyServer = proxyServer; + } + + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(1024 * 1024)); + p.addLast(new HttpInboundHandler(this.proxyServer)); + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundServer.java b/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundServer.java new file mode 100644 index 00000000..14cf047d --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_2/inbound/HttpInboundServer.java @@ -0,0 +1,57 @@ +package io.github.tn.week03_2.inbound; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HttpInboundServer { + private static Logger logger = LoggerFactory.getLogger(HttpInboundServer.class); + + private int port; + + private String proxyServer; + + public HttpInboundServer(int port, String proxyServer) { + this.port=port; + this.proxyServer = proxyServer; + } + + public void run() throws Exception { + + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(16); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_RCVBUF, 32 * 1024) + .option(ChannelOption.SO_SNDBUF, 32 * 1024) + .option(EpollChannelOption.SO_REUSEPORT, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpInboundInitializer(this.proxyServer)); + + Channel ch = b.bind(port).sync().channel(); + logger.info("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/'); + ch.closeFuture().sync(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_2/outbound/HttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/tn/week03_2/outbound/HttpOutboundHandler.java new file mode 100644 index 00000000..fae67ce6 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_2/outbound/HttpOutboundHandler.java @@ -0,0 +1,91 @@ +package io.github.tn.week03_2.outbound; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpUtil; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; + +import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * @author tn + * @version 1 + * @ClassName HttpOutboundHandler + * @description httpClient + * @date 2020/10/31 14:59 + */ +public class HttpOutboundHandler { + private String backendUrl; + + public HttpOutboundHandler(String backendUrl) { + this.backendUrl = backendUrl; + } + + // invoking + public void requset(FullHttpRequest fullRequest, ChannelHandlerContext ctx){ + final String url = this.backendUrl + fullRequest.uri(); + invoking(fullRequest, ctx, url); + } + + + // invoking + private String invoking(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url){ + HttpGet httpGet = new HttpGet(url); + httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + try(CloseableHttpClient httpClient = HttpClients.createDefault(); + CloseableHttpResponse resp = httpClient.execute(httpGet) ) { + handleResponse(inbound, ctx, resp); + }catch (Exception e){ + System.err.println("接口调用失败"); + } + return "接口调用失败"; + } + + + // Response + private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final HttpResponse endpointResponse) throws Exception { + FullHttpResponse response = null; + try { + + byte[] body = EntityUtils.toByteArray(endpointResponse.getEntity()); + + response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body)); + response.headers().set("Content-Type", "application/json"); + response.headers().setInt("Content-Length", Integer.parseInt(endpointResponse.getFirstHeader("Content-Length").getValue())); + + } catch (Exception e) { + e.printStackTrace(); + response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); + exceptionCaught(ctx, e); + } finally { + if (fullRequest != null) { + if (!HttpUtil.isKeepAlive(fullRequest)) { + ctx.write(response).addListener(ChannelFutureListener.CLOSE); + } else { + ctx.write(response); + } + } + ctx.flush(); + } + + } + + // 关流 + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/NettyServerApplication.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/NettyServerApplication.java new file mode 100644 index 00000000..6540df47 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/NettyServerApplication.java @@ -0,0 +1,28 @@ +package io.github.tn.week03_3; + + +import io.github.tn.week03_3.inbound.HttpInboundServer; + +public class NettyServerApplication { + + public final static String GATEWAY_NAME = "NIOGateway"; + public final static String GATEWAY_VERSION = "1.0.0"; + + public static void main(String[] args) { + String proxyServer = System.getProperty("proxyServer","http://localhost:8801"); + String proxyPort = System.getProperty("proxyPort","8888"); + + // http://localhost:8888/api/hello ==> gateway API + // http://localhost:8088/api/hello ==> backend service + + int port = Integer.parseInt(proxyPort); + System.out.println(GATEWAY_NAME + " " + GATEWAY_VERSION +" starting..."); + HttpInboundServer server = new HttpInboundServer(port, proxyServer); + System.out.println(GATEWAY_NAME + " " + GATEWAY_VERSION +" started at http://localhost:" + port + " for server:" + proxyServer); + try { + server.run(); + }catch (Exception ex){ + ex.printStackTrace(); + } + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/filter/HttpRequestFilter.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/filter/HttpRequestFilter.java new file mode 100644 index 00000000..a733054d --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/filter/HttpRequestFilter.java @@ -0,0 +1,10 @@ +package io.github.tn.week03_3.filter; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; + +public interface HttpRequestFilter { + + void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx); + +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/filter/HttpRequestFilterImpl.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/filter/HttpRequestFilterImpl.java new file mode 100644 index 00000000..4cf397a1 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/filter/HttpRequestFilterImpl.java @@ -0,0 +1,23 @@ +package io.github.tn.week03_3.filter; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; + +/** + * @author tn + * @version 1 + * @ClassName HttpRequestFilterimpl + * @description 拦截器 + * @date 2020/11/3 20:16 + */ +public class HttpRequestFilterImpl implements HttpRequestFilter { + + + @Override + public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) { + //如果同名header已存在,则追加至原同名header后面 + fullRequest.headers().add("nio","tanning"); + //如果同名header已存在,则覆盖一个同名header。 +// fullRequest.headers().set("nio","tanning"); + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundHandler.java new file mode 100644 index 00000000..23799df1 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundHandler.java @@ -0,0 +1,80 @@ +package io.github.tn.week03_3.inbound; + +import io.github.tn.week03_3.filter.HttpRequestFilter; +import io.github.tn.week03_3.filter.HttpRequestFilterImpl; +import io.github.tn.week03_3.outbound.httpclient4.HttpOutboundHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.util.ReferenceCountUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpInboundHandler extends ChannelInboundHandlerAdapter { + + private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class); + private final String proxyServer; + private HttpOutboundHandler handler; + private HttpRequestFilter requestFilter; + + public HttpInboundHandler(String proxyServer) { + this.proxyServer = proxyServer; + this.requestFilter = new HttpRequestFilterImpl(); + handler = new HttpOutboundHandler(this.proxyServer); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + //logger.info("channelRead流量接口请求开始,时间为{}", startTime); + FullHttpRequest fullRequest = (FullHttpRequest) msg; +// String uri = fullRequest.uri(); +// //logger.info("接收到的请求url为{}", uri); +// if (uri.contains("/test")) { +// handlerTest(fullRequest, ctx); +// } + requestFilter.filter(fullRequest,ctx); + handler.handle(fullRequest, ctx); + + } catch(Exception e) { + e.printStackTrace(); + } finally { + ReferenceCountUtil.release(msg); + } + } + +// private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) { +// FullHttpResponse response = null; +// try { +// String value = "hello,kimmking"; +// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); +// response.headers().set("Content-Type", "application/json"); +// response.headers().setInt("Content-Length", response.content().readableBytes()); +// +// } catch (Exception e) { +// logger.error("处理测试接口出错", e); +// response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); +// } finally { +// if (fullRequest != null) { +// if (!HttpUtil.isKeepAlive(fullRequest)) { +// ctx.write(response).addListener(ChannelFutureListener.CLOSE); +// } else { +// response.headers().set(CONNECTION, KEEP_ALIVE); +// ctx.write(response); +// } +// } +// } +// } +// +// @Override +// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { +// cause.printStackTrace(); +// ctx.close(); +// } + +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundInitializer.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundInitializer.java new file mode 100644 index 00000000..7d224a96 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundInitializer.java @@ -0,0 +1,28 @@ +package io.github.tn.week03_3.inbound; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; + +public class HttpInboundInitializer extends ChannelInitializer { + + private String proxyServer; + + public HttpInboundInitializer(String proxyServer) { + this.proxyServer = proxyServer; + } + + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); +// if (sslCtx != null) { +// p.addLast(sslCtx.newHandler(ch.alloc())); +// } + p.addLast(new HttpServerCodec()); + //p.addLast(new HttpServerExpectContinueHandler()); + p.addLast(new HttpObjectAggregator(1024 * 1024)); + p.addLast(new HttpInboundHandler(this.proxyServer)); + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundServer.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundServer.java new file mode 100644 index 00000000..ebb0d74f --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/inbound/HttpInboundServer.java @@ -0,0 +1,57 @@ +package io.github.tn.week03_3.inbound; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HttpInboundServer { + private static Logger logger = LoggerFactory.getLogger(HttpInboundServer.class); + + private int port; + + private String proxyServer; + + public HttpInboundServer(int port, String proxyServer) { + this.port=port; + this.proxyServer = proxyServer; + } + + public void run() throws Exception { + + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(16); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_RCVBUF, 32 * 1024) + .option(ChannelOption.SO_SNDBUF, 32 * 1024) + .option(EpollChannelOption.SO_REUSEPORT, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpInboundInitializer(this.proxyServer)); + + Channel ch = b.bind(port).sync().channel(); + logger.info("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/'); + ch.closeFuture().sync(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/httpclient4/HttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/httpclient4/HttpOutboundHandler.java new file mode 100644 index 00000000..42173f93 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/httpclient4/HttpOutboundHandler.java @@ -0,0 +1,138 @@ +package io.github.tn.week03_3.outbound.httpclient4; + + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpUtil; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; + +import java.util.concurrent.*; + +import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class HttpOutboundHandler { + + private CloseableHttpAsyncClient httpclient; + private ExecutorService proxyService; + private String backendUrl; + + public HttpOutboundHandler(String backendUrl){ + this.backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl; + int cores = Runtime.getRuntime().availableProcessors() * 2; + long keepAliveTime = 1000; + int queueSize = 2048; + RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy(); + proxyService = new ThreadPoolExecutor(cores, cores, + keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize), + new NamedThreadFactory("proxyService"), handler); + + IOReactorConfig ioConfig = IOReactorConfig.custom() + .setConnectTimeout(1000) + .setSoTimeout(1000) + .setIoThreadCount(cores) + .setRcvBufSize(32 * 1024) + .build(); + + httpclient = HttpAsyncClients.custom().setMaxConnTotal(40) + .setMaxConnPerRoute(8) + .setDefaultIOReactorConfig(ioConfig) + .setKeepAliveStrategy((response,context) -> 6000) + .build(); + httpclient.start(); + } + + public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) { + final String url = this.backendUrl + fullRequest.uri(); + proxyService.submit(()->fetchGet(fullRequest, ctx, url)); + } + + private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) { + final HttpGet httpGet = new HttpGet(url); + //httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE); + httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + httpclient.execute(httpGet, new FutureCallback() { + @Override + public void completed(final HttpResponse endpointResponse) { + try { + handleResponse(inbound, ctx, endpointResponse); + } catch (Exception e) { + e.printStackTrace(); + } finally { + + } + } + + @Override + public void failed(final Exception ex) { + httpGet.abort(); + ex.printStackTrace(); + } + + @Override + public void cancelled() { + httpGet.abort(); + } + }); + } + + private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final HttpResponse endpointResponse) throws Exception { + FullHttpResponse response = null; + try { +// String value = "hello,kimmking"; +// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); +// response.headers().set("Content-Type", "application/json"); +// response.headers().setInt("Content-Length", response.content().readableBytes()); + + + byte[] body = EntityUtils.toByteArray(endpointResponse.getEntity()); +// System.out.println(new String(body)); +// System.out.println(body.length); + + response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body)); + response.headers().set("Content-Type", "application/json"); + response.headers().setInt("Content-Length", Integer.parseInt(endpointResponse.getFirstHeader("Content-Length").getValue())); +// response.headers().set("nio","tanning"); +// for (Header e : endpointResponse.getAllHeaders()) { +// //response.headers().set(e.getName(),e.getValue()); +// System.out.println(e.getName() + " => " + e.getValue()); +// } + + } catch (Exception e) { + e.printStackTrace(); + response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); + exceptionCaught(ctx, e); + } finally { + if (fullRequest != null) { + if (!HttpUtil.isKeepAlive(fullRequest)) { + ctx.write(response).addListener(ChannelFutureListener.CLOSE); + } else { + //response.headers().set(CONNECTION, KEEP_ALIVE); + ctx.write(response); + } + } + ctx.flush(); + //ctx.close(); + } + + } + + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/httpclient4/NamedThreadFactory.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/httpclient4/NamedThreadFactory.java new file mode 100644 index 00000000..b395f6c1 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/httpclient4/NamedThreadFactory.java @@ -0,0 +1,32 @@ +package io.github.tn.week03_3.outbound.httpclient4; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private final String namePrefix; + private final boolean daemon; + + public NamedThreadFactory(String namePrefix, boolean daemon) { + this.daemon = daemon; + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + this.namePrefix = namePrefix; + } + + public NamedThreadFactory(String namePrefix) { + this(namePrefix, false); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + "-thread-" + threadNumber.getAndIncrement(), 0); + t.setDaemon(daemon); + return t; + } +} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/netty4/NettyHttpClient.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/netty4/NettyHttpClient.java new file mode 100644 index 00000000..27c0bbf4 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/netty4/NettyHttpClient.java @@ -0,0 +1,51 @@ +package io.github.tn.week03_3.outbound.netty4;//package io.github.kimmking.gateway.outbound; +// +//import io.netty.bootstrap.Bootstrap; +//import io.netty.channel.ChannelFuture; +//import io.netty.channel.ChannelInitializer; +//import io.netty.channel.ChannelOption; +//import io.netty.channel.EventLoopGroup; +//import io.netty.channel.nio.NioEventLoopGroup; +//import io.netty.channel.socket.SocketChannel; +//import io.netty.channel.socket.nio.NioSocketChannel; +//import io.netty.handler.codec.http.HttpRequestEncoder; +//import io.netty.handler.codec.http.HttpResponseDecoder; +// +//public class NettyHttpClient { +// public void connect(String host, int port) throws Exception { +// EventLoopGroup workerGroup = new NioEventLoopGroup(); +// +// try { +// Bootstrap b = new Bootstrap(); +// b.group(workerGroup); +// b.channel(NioSocketChannel.class); +// b.option(ChannelOption.SO_KEEPALIVE, true); +// b.handler(new ChannelInitializer() { +// @Override +// public void initChannel(SocketChannel ch) throws Exception { +// // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 +// ch.pipeline().addLast(new HttpResponseDecoder()); +// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 +// ch.pipeline().addLast(new HttpRequestEncoder()); +// ch.pipeline().addLast(new HttpClientOutboundHandler()); +// } +// }); +// +// // Start the client. +// ChannelFuture f = b.connect(host, port).sync(); +// +// +// f.channel().write(request); +// f.channel().flush(); +// f.channel().closeFuture().sync(); +// } finally { +// workerGroup.shutdownGracefully(); +// } +// +// } +// +// public static void main(String[] args) throws Exception { +// NettyHttpClient client = new NettyHttpClient(); +// client.connect("127.0.0.1", 8844); +// } +//} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/netty4/NettyHttpClientOutboundHandler.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/netty4/NettyHttpClientOutboundHandler.java new file mode 100644 index 00000000..9f58f510 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/netty4/NettyHttpClientOutboundHandler.java @@ -0,0 +1,22 @@ +package io.github.tn.week03_3.outbound.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class NettyHttpClientOutboundHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext ctx) + throws Exception { + + + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + + + + } +} \ No newline at end of file diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/okhttp/OkhttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/okhttp/OkhttpOutboundHandler.java new file mode 100644 index 00000000..16a65745 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/outbound/okhttp/OkhttpOutboundHandler.java @@ -0,0 +1,4 @@ +package io.github.tn.week03_3.outbound.okhttp; + +public class OkhttpOutboundHandler { +} diff --git a/02nio/nio02/src/main/java/io/github/tn/week03_3/router/HttpEndpointRouter.java b/02nio/nio02/src/main/java/io/github/tn/week03_3/router/HttpEndpointRouter.java new file mode 100644 index 00000000..62ceab38 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/tn/week03_3/router/HttpEndpointRouter.java @@ -0,0 +1,9 @@ +package io.github.tn.week03_3.router; + +import java.util.List; + +public interface HttpEndpointRouter { + + String route(List endpoints); + +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java b/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java index dd7d4375..19ad470c 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java @@ -1,7 +1,5 @@ package java0.conc0303; -import java.util.concurrent.CountDownLatch; - /** * 本周作业:(必做)思考有多少种方式,在main函数启动一个新线程或线程池, * 异步运行一个方法,拿到这个方法的返回值后,退出主线程? @@ -16,14 +14,14 @@ public static void main(String[] args) { long start=System.currentTimeMillis(); // 在这里创建一个线程或线程池, // 异步执行 下面方法 - + int result = sum(); //这是得到的返回值 // 确保 拿到result 并输出 System.out.println("异步计算结果为:"+result); - + System.out.println("使用时间:"+ (System.currentTimeMillis()-start) + " ms"); - + // 然后退出main线程 } diff --git a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java index 17028167..9b305cbd 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java @@ -14,10 +14,10 @@ public static void main(String[] args) { // why Vector 也不安全 // List list = new ArrayList(); // List list = new LinkedList<>(); - List list = new Vector<>(); +// List list = new Vector<>(); // 只有CopyOnWriteArrayList 安全,不报错 - //List list = new CopyOnWriteArrayList(); + List list = new CopyOnWriteArrayList(); for (int i = 0; i < 10000; i++) { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java index 82718cdb..b99bf181 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java @@ -9,8 +9,8 @@ public class CopyOnWriteArrayListDemo1 { private static final int THREAD_POOL_MAX_NUM = 10; - private List mList = new ArrayList(); // ArrayList 无法运行 - //private List mList = new CopyOnWriteArrayList<>(); + //private List mList = new ArrayList(); // ArrayList 无法运行 + private List mList = new CopyOnWriteArrayList<>(); public static void main(String args[]) { new CopyOnWriteArrayListDemo1().start(); diff --git a/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java index c5d1e900..282d695e 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/future/CompletableFutureDemo.java @@ -6,24 +6,25 @@ public class CompletableFutureDemo { public static void main(String[] args){ - // 1.变换结果 + // 1.变换结果 - 返回结果 System.out.println("=====>1.变换结果"); + // lambada String result1 = CompletableFuture.supplyAsync(()->{return "Hello ";}).thenApplyAsync(v -> v + "world").join(); System.out.println(result1); - // 2.消费 + // 2.消费 - 不返回结果 内部直接答应结果 CompletableFuture.supplyAsync(()->{return "Hello ";}).thenAccept(v -> { System.out.println("=====>2.消费");System.out.println("consumer: " + v);}); // 3.组合 System.out.println("=====>3.组合"); - String result3 = CompletableFuture.supplyAsync(()->{ + String result3 = CompletableFuture.supplyAsync(()->{//supplyAsync可以支持返回值 ,runAsync方法不支持返回值。 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello"; - }).thenCombine(CompletableFuture.supplyAsync(()->{ + }).thenCombine(CompletableFuture.supplyAsync(()->{// thenCombine合并任务 try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -32,10 +33,11 @@ public static void main(String[] args){ return "world"; }),(s1,s2)->{return s1 + " " + s2;}).join(); System.out.println("thenCombine:"+result3); - + + // 当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化 + // thenAccept 消费处理结果 接收任务的处理结果,并消费处理,无返回结果。 CompletableFuture.supplyAsync(() -> "Hello, java course.") - .thenApply(String::toUpperCase).thenCompose(s -> CompletableFuture.supplyAsync(s::toLowerCase)) - .thenAccept(v -> { System.out.println("thenCompose:"+v);}); + .thenApply(String::toUpperCase).thenCompose(s -> CompletableFuture.supplyAsync(s::toLowerCase)).thenAccept(v -> { System.out.println("thenCompose:"+v);}); // 4.竞争 System.out.println("=====>4.竞争"); @@ -69,7 +71,7 @@ public static void main(String[] args){ } return "Hi Boy"; - }).exceptionally(e->{ // Fluent API + }).exceptionally(e->{ System.out.println(e.getMessage()); return "Hello world!"; }).join(); diff --git a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureDemo1.java b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureDemo1.java index 14a16383..6945b4d8 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureDemo1.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureDemo1.java @@ -3,15 +3,21 @@ import java.util.Random; import java.util.concurrent.*; +//方法回调 public class FutureDemo1 { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Future result = executor.submit(new Callable() { public Integer call() throws Exception { - return new Random().nextInt(); +// return new Random().nextInt(); + int numberInt = new Random().nextInt(); + System.out.println(numberInt); + return numberInt; + } }); executor.shutdown(); + // try 超时处理 try { System.out.println("result:" + result.get()); } catch (InterruptedException e) { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java index 69499819..793c846d 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java @@ -2,7 +2,7 @@ import java.util.Random; import java.util.concurrent.*; - +//线程回调 public class FutureTask1 { public static void main(String[] args) { //第一种方式 diff --git a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java index aaf9567d..5b02350b 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java @@ -19,7 +19,7 @@ public static void main(String[] args) { // // 串行,单线程 // longList.stream().forEach( // 并行,默认使用CPU * 2个线程 - longList.stream().forEach( + longList.stream().parallel().forEach( i -> { try { blockingQueue.put(i); diff --git a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelTest.java b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelTest.java new file mode 100644 index 00000000..c5c6c48e --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelTest.java @@ -0,0 +1,61 @@ +package java0.conc0303.stream; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * parallel 不适合IO密集型 (多线程处理) + * @author tn + * @version 1 + * @ClassName StreamParallelTest + * @description + * @date 2020/11/7 22:24 + */ +public class StreamParallelTest { + + public static void main(String[] args) { + List list = new ArrayList<>(); + BlockingQueue blockingQueue = new LinkedBlockingQueue(10000); + //随机设置值 + IntStream.range(1, 10000).forEach(i -> list.add(i)); + System.out.println("Integer:"+list.toString()); + + +// List longList = list.stream().parallel() +// .map(i -> i.longValue()) +// .sorted() +// .collect(Collectors.toList()); + List longList = list.stream().map(i -> i.longValue()).sorted().collect(Collectors.toList()); + long start=System.currentTimeMillis(); + +// 并行,默认使用CPU * 2个线程 +// longList.stream().parallel().forEach( +// i -> { +// try { +// blockingQueue.put(i); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// }); + longList.stream().forEach( + i -> { + try { + blockingQueue.put(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + System.out.println("blockingQueue:" + blockingQueue.toString()); + + System.out.println("使用时间:"+ (System.currentTimeMillis()-start) + " ms"); + + System.out.println("longList:"+longList.toString()); + +// BlockingQueue blockingQueue = new LinkedBlockingQueue(10000); + } + +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo.java index 66cd7406..b93731be 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo.java @@ -5,7 +5,7 @@ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); - for(int i=0;i<10;i++){ + for(int i=0;i<5;i++){ new Thread(new readNum(i,countDownLatch)).start(); } countDownLatch.await(); // 注意跟CyclicBarrier不同,这里在主线程await diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo2.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo2.java index 62a1007a..7672d30b 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo2.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/CountDownLatchDemo2.java @@ -20,17 +20,19 @@ public static void main(String[] args) throws Exception { exec.execute(() -> { try { test(threadNum); - //countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } finally { + //确保contDown, test() 报错不影响其他 countDownLatch.countDown(); } }); } countDownLatch.await(); System.out.println("==>所有程序员完成任务,项目顺利上线!"); - //exec.shutdown(); + // 关闭线程池中 当前使用的线程 + // 必须写,可能会阻塞线程池中的线程,造成线程一直等待工作 + exec.shutdown(); } private static void test(int threadNum) throws Exception { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo.java index fd7b8aa6..dc4958c7 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo.java @@ -4,21 +4,18 @@ public class CyclicBarrierDemo { public static void main(String[] args) throws InterruptedException { - CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { + CyclicBarrier cyclicBarrier = new CyclicBarrier(100, new Runnable() { @Override public void run() { System.out.println("回调>>"+Thread.currentThread().getName()); System.out.println("回调>>线程组执行结束"); - System.out.println("==>各个子线程执行结束。。。。"); } }); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 100; i++) { new Thread(new readNum(i,cyclicBarrier)).start(); } - - // ==>>> - - + + System.out.println("==>各个子线程执行结束。。。。"); System.out.println("==>主线程执行结束。。。。"); //CyclicBarrier 可以重复利用, diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java index 33e88260..32d1d3c2 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/CyclicBarrierDemo2.java @@ -11,18 +11,94 @@ public static void main(String[] args) { for(int i=0;i CyclicBarrier重用"); + System.out.println("CyclicBarrier重用 -------------------- 1 "); for(int i=0;i