From 22444afd630b9ca474c97cc274a5883b67b08f72 Mon Sep 17 00:00:00 2001 From: v-yanb07 Date: Thu, 12 Nov 2020 20:02:10 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=B7=B1=E4=BF=AE=E6=94=B9=E7=9A=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/java0/nio01/HttpServer03.java | 14 +++- .../gateway/filter/HttpRequestFilterImpl.java | 14 ++++ .../gateway/inbound/HttpInboundHandler.java | 55 +++++++++------ .../httpclient4/HttpOutboundHandler.java | 3 + .../java/java0/conc0301/sync/Counter.java | 6 +- .../java0/conc0302/lock/ConditionDemo.java | 7 +- .../conc0302/lock/ReentrantLockDemo.java | 4 +- .../lock/ReentrantReadWriteLockDemo2.java | 5 ++ .../main/java/java0/conc0303/Homework03.java | 70 ++++++++++++++++--- .../collection/CopyOnWriteArrayListDemo.java | 4 +- .../collection/CopyOnWriteArrayListDemo1.java | 2 +- .../java0/conc0303/future/FutureTask1.java | 20 +++--- .../conc0303/stream/StreamParallelDemo.java | 8 ++- 13 files changed, 154 insertions(+), 58 deletions(-) create mode 100644 02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilterImpl.java diff --git a/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java b/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java index fad136a8..1e23cb88 100644 --- a/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java +++ b/02nio/nio01/src/main/java/java0/nio01/HttpServer03.java @@ -1,6 +1,7 @@ package java0.nio01; import java.io.IOException; +import java.io.InputStream; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; @@ -9,8 +10,8 @@ public class HttpServer03 { public static void main(String[] args) throws IOException{ - ExecutorService executorService = Executors.newFixedThreadPool(40); - final ServerSocket serverSocket = new ServerSocket(8803); + ExecutorService executorService = Executors.newFixedThreadPool(20); + final ServerSocket serverSocket = new ServerSocket(8088); while (true) { try { final Socket socket = serverSocket.accept(); @@ -23,11 +24,18 @@ public static void main(String[] args) throws IOException{ private static void service(Socket socket) { try { + InputStream in = socket.getInputStream(); + byte[] buf = new byte[1024]; + in.read(buf); + System.out.println("request from client " + socket.getInetAddress().getHostAddress()); + + System.out.println(new String(buf)); + Thread.sleep(20); PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); - String body = "hello,nio"; + String body = new String(buf); printWriter.println("Content-Length:" + body.getBytes().length); printWriter.println(); printWriter.write(body); diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilterImpl.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilterImpl.java new file mode 100644 index 00000000..4e9ea5d5 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilterImpl.java @@ -0,0 +1,14 @@ +package io.github.kimmking.gateway.filter; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; + +public class HttpRequestFilterImpl implements HttpRequestFilter { + + @Override + public void filter(FullHttpRequest fullRequest,ChannelHandlerContext ctx){ + fullRequest.headers().add("NIO","yanbing"); + + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java index 22fb2525..d2c1a2da 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java @@ -1,13 +1,24 @@ package io.github.kimmking.gateway.inbound; import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpUtil; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE; +import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.http.HttpHeaders.CONNECTION; + public class HttpInboundHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class); @@ -44,28 +55,28 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } } -// private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) { -// FullHttpResponse response = null; -// try { -// String value = "hello,kimmking"; -// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); -// response.headers().set("Content-Type", "application/json"); -// response.headers().setInt("Content-Length", response.content().readableBytes()); -// -// } catch (Exception e) { -// logger.error("处理测试接口出错", e); -// response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); -// } finally { -// if (fullRequest != null) { -// if (!HttpUtil.isKeepAlive(fullRequest)) { -// ctx.write(response).addListener(ChannelFutureListener.CLOSE); -// } else { -// response.headers().set(CONNECTION, KEEP_ALIVE); -// ctx.write(response); -// } -// } -// } -// } + private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) { + FullHttpResponse response = null; + try { + String value = "hello,kimmking"; + response = new DefaultFullHttpResponse (HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); + response.headers().set("Content-Type", "application/json"); + response.headers().setInt("Content-Length", response.content().readableBytes()); + + } catch (Exception e) { + logger.error("处理测试接口出错", e); + response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT); + } finally { + if (fullRequest != null) { + if (!HttpUtil.isKeepAlive(fullRequest)) { + ctx.write(response).addListener( ChannelFutureListener.CLOSE); + } else { + response.headers().set(CONNECTION, KEEP_ALIVE); + ctx.write(response); + } + } + } + } // // @Override // public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java index 856dc168..71b8714b 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java @@ -1,6 +1,7 @@ package io.github.kimmking.gateway.outbound.httpclient4; +import io.github.kimmking.gateway.filter.HttpRequestFilterImpl; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -63,10 +64,12 @@ private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext final HttpGet httpGet = new HttpGet(url); //httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE); httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + httpGet.setHeader("yanbing","headers"); httpclient.execute(httpGet, new FutureCallback() { @Override public void completed(final HttpResponse endpointResponse) { try { + new HttpRequestFilterImpl().filter(inbound,ctx); handleResponse(inbound, ctx, endpointResponse); } catch (Exception e) { e.printStackTrace(); diff --git a/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java b/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java index e60845a6..357e315c 100644 --- a/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java +++ b/03concurrency/0301/src/main/java/java0/conc0301/sync/Counter.java @@ -1,11 +1,11 @@ package java0.conc0301.sync; public class Counter { - + public final static int A=10; - + public static int B=10; - + private volatile int sum = 0; public synchronized void incr() { sum=sum+1; diff --git a/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java b/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java index 33ad0bab..2adf9956 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/lock/ConditionDemo.java @@ -16,10 +16,13 @@ public void put(Object x) throws InterruptedException { lock.lock(); try { // 当count等于数组的大小时,当前线程等待,直到notFull通知,再进行生产 - while (count == items.length) + while (count == items.length) { notFull.await(); + } items[putptr] = x; - if (++putptr == items.length) putptr = 0; + if (++putptr == items.length) { + putptr = 0; + } ++count; notEmpty.signal(); } finally { diff --git a/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantLockDemo.java b/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantLockDemo.java index d0ff5f98..e9de0447 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantLockDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantLockDemo.java @@ -6,7 +6,7 @@ public class ReentrantLockDemo { public static void main(String[] args) { final Count count = new Count(); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 5; i++) { new Thread() { public void run() { count.get(); @@ -14,7 +14,7 @@ public void run() { }.start(); } - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 4; i++) { new Thread() { public void run() { count.put(); diff --git a/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantReadWriteLockDemo2.java b/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantReadWriteLockDemo2.java index 82b4001b..8bc77250 100644 --- a/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantReadWriteLockDemo2.java +++ b/03concurrency/0301/src/main/java/java0/conc0302/lock/ReentrantReadWriteLockDemo2.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReentrantReadWriteLockDemo2 { @@ -12,7 +13,11 @@ public class ReentrantReadWriteLockDemo2 { private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); public Object readWrite(String key) { + ReentrantLock lock = new ReentrantLock(); + lock.lock(); Object value = null; + lock.unlock(); + System.out.println("1.首先开启读锁去缓存中取数据"); rwLock.readLock().lock(); try { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java b/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java index 3e64fc85..85d02150 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/Homework03.java @@ -1,5 +1,10 @@ package java0.conc0303; +import java0.conc0303.tool.CountDownLatchDemo; + +import java.util.Random; +import java.util.concurrent.*; + /** * 本周作业:(必做)思考有多少种方式,在main函数启动一个新线程或线程池, * 异步运行一个方法,拿到这个方法的返回值后,退出主线程? @@ -9,24 +14,69 @@ */ public class Homework03 { - public static void main(String[] args) { - + public static void main(String[] args) throws InterruptedException, ExecutionException{ long start=System.currentTimeMillis(); - // 在这里创建一个线程或线程池, // 异步执行 下面方法 - - int result = sum(); //这是得到的返回值 - +// Integer result = method01(); + +// Integer result = method02(); + +// Integer result = mothod03(); // 确保 拿到result 并输出 + Integer result = method04(); + System.out.println("异步计算结果为:"+result); - System.out.println("使用时间:"+ (System.currentTimeMillis()-start) + " ms"); - // 然后退出main线程 } - + + private static Integer method04() throws InterruptedException, ExecutionException{ + ExecutorService executor = Executors.newCachedThreadPool(); + Future call = executor.submit(new Callable() { + @Override + public Integer call() throws Exception { + return sum(); + } + }); + Integer result = call.get(); + executor.shutdown(); + return result; + } + + private static Integer mothod03() throws InterruptedException, ExecutionException{ + ExecutorService executor = Executors.newSingleThreadExecutor(); + FutureTask task = new FutureTask(new Callable() { + @Override + public Integer call() throws Exception { + return sum(); + } + }); + executor.submit(task); + return task.get(); + } + + private static Integer method02() throws InterruptedException, ExecutionException{ + FutureTask task = new FutureTask(new Callable() { + @Override + public Integer call() throws Exception { + return sum(); + } + }); + new Thread(task).start(); + return task.get(); + } + + + private static Integer method01(){ + return (Integer) CompletableFuture.supplyAsync(() -> { + return sum(); + }).join(); + } + + + private static int sum() { - return fibo(36); + return fibo(35); } private static int fibo(int a) { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java index 9b305cbd..0a096c20 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo.java @@ -13,11 +13,11 @@ public static void main(String[] args) { // ArrayList,LinkedList,Vector不安全,运行报错 // why Vector 也不安全 // List list = new ArrayList(); -// List list = new LinkedList<>(); + List list = new LinkedList<>(); // List list = new Vector<>(); // 只有CopyOnWriteArrayList 安全,不报错 - List list = new CopyOnWriteArrayList(); +// List list = new CopyOnWriteArrayList(); for (int i = 0; i < 10000; i++) { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java index b99bf181..72539cc2 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/collection/CopyOnWriteArrayListDemo1.java @@ -9,7 +9,7 @@ public class CopyOnWriteArrayListDemo1 { private static final int THREAD_POOL_MAX_NUM = 10; - //private List mList = new ArrayList(); // ArrayList 无法运行 +// private List mList = new ArrayList(); // ArrayList 无法运行 private List mList = new CopyOnWriteArrayList<>(); public static void main(String args[]) { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java index 69499819..9d23675a 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java @@ -6,22 +6,22 @@ public class FutureTask1 { public static void main(String[] args) { //第一种方式 - FutureTask task = new FutureTask(new Callable() { - @Override - public Integer call() throws Exception { - return new Random().nextInt(); - } - }); - new Thread(task).start(); - //第二种方方式 -// ExecutorService executor = Executors.newSingleThreadExecutor(); // FutureTask task = new FutureTask(new Callable() { // @Override // public Integer call() throws Exception { // return new Random().nextInt(); // } // }); -// executor.submit(task); +// new Thread(task).start(); + //第二种方方式 + ExecutorService executor = Executors.newSingleThreadExecutor(); + FutureTask task = new FutureTask(new Callable() { + @Override + public Integer call() throws Exception { + return new Random().nextInt(); + } + }); + executor.submit(task); try { System.out.println("result: " + task.get()); diff --git a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java index 5b02350b..cb949c37 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/stream/StreamParallelDemo.java @@ -9,9 +9,10 @@ public class StreamParallelDemo { public static void main(String[] args) { + long start = System.currentTimeMillis(); List list = new ArrayList<>(); - IntStream.range(1, 10000).forEach(i -> list.add(i)); - BlockingQueue blockingQueue = new LinkedBlockingQueue(10000); + IntStream.range(1, 50000000).forEach(i -> list.add(i)); + BlockingQueue blockingQueue = new LinkedBlockingQueue(50000000); List longList = list.stream().parallel() .map(i -> i.longValue()) .sorted() @@ -27,7 +28,8 @@ public static void main(String[] args) { e.printStackTrace(); } }); - System.out.println("blockingQueue:" + blockingQueue.toString()); +// System.out.println("blockingQueue:" + blockingQueue.toString()); + System.out.println("time:" + (System.currentTimeMillis()-start)); }