From 739e83e16d49479f79c48ee6a837b15b75f973b4 Mon Sep 17 00:00:00 2001 From: tangtian Date: Thu, 12 Nov 2020 21:25:54 +0800 Subject: [PATCH 1/3] mearge --- .../0301/src/main/java/META-INF/MANIFEST.MF | 3 ++ .../java0/conc0303/future/FutureTask1.java | 20 +++---- .../java0/conc0303/homework3/Homework03.java | 37 +++++++++++++ .../homework3/byFuture/FutureTaskExample.java | 36 +++++++++++++ .../byThreadpool/FutureTaskExample.java | 44 ++++++++++++++++ .../CountDownLatchExample.java | 39 ++++++++++++++ .../CyclicBarrierExample.java | 48 +++++++++++++++++ .../byconcurrentTool/SemaphoreExample.java | 40 ++++++++++++++ .../byThreadpool/CountDownLatchExample.java | 49 ++++++++++++++++++ .../byThreadpool/CyclicBarrierExample.java | 48 +++++++++++++++++ .../byThreadpool/SemaphoreExample.java | 47 +++++++++++++++++ .../java0/conc0303/tool/SemaphoreDemo.java | 6 +-- .../META-INF/JavaCourseCodes.kotlin_module | Bin 0 -> 16 bytes .../JavaCourseCodes/META-INF/MANIFEST.MF | 3 ++ ...7\345\242\203\345\207\206\345\244\207.txt" | 38 ++++++++++++++ 15 files changed, 445 insertions(+), 13 deletions(-) create mode 100644 03concurrency/0301/src/main/java/META-INF/MANIFEST.MF create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/Homework03.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/FutureTaskExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/byThreadpool/FutureTaskExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CountDownLatchExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CyclicBarrierExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/SemaphoreExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CountDownLatchExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CyclicBarrierExample.java create mode 100644 03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/SemaphoreExample.java create mode 100644 out/production/JavaCourseCodes/META-INF/JavaCourseCodes.kotlin_module create mode 100644 out/production/JavaCourseCodes/META-INF/MANIFEST.MF create mode 100644 "out/production/JavaCourseCodes/\347\216\257\345\242\203\345\207\206\345\244\207.txt" diff --git a/03concurrency/0301/src/main/java/META-INF/MANIFEST.MF b/03concurrency/0301/src/main/java/META-INF/MANIFEST.MF new file mode 100644 index 00000000..ef62bb7b --- /dev/null +++ b/03concurrency/0301/src/main/java/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: + diff --git a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java index 69499819..d799563c 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/future/FutureTask1.java @@ -13,16 +13,16 @@ public Integer call() throws Exception { } }); new Thread(task).start(); - //第二种方方式 -// ExecutorService executor = Executors.newSingleThreadExecutor(); -// FutureTask task = new FutureTask(new Callable() { -// @Override -// public Integer call() throws Exception { -// return new Random().nextInt(); -// } -// }); -// executor.submit(task); - + //第二种方方式// ExecutorService executor = Executors.newSingleThreadExecutor(); + //// FutureTask task = new FutureTask(new Callable() { + //// @Override + //// public Integer call() throws Exception { + //// return new Random().nextInt(); + //// } + //// }); + //// executor.submit(task); + // + try { System.out.println("result: " + task.get()); } catch (InterruptedException e) { diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/Homework03.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/Homework03.java new file mode 100644 index 00000000..50075bce --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/Homework03.java @@ -0,0 +1,37 @@ +package java0.conc0303.homework3; + +/** + * 本周作业:(必做)思考有多少种方式,在main函数启动一个新线程或线程池, + * 异步运行一个方法,拿到这个方法的返回值后,退出主线程? + * 写出你的方法,越多越好,提交到github。 + * + * 一个简单的代码参考: + */ +public class Homework03 { + + public static void main(String[] args) { + + long start=System.currentTimeMillis(); + // 在这里创建一个线程或线程池, + // 异步执行 下面方法 + + int result = sum(); //这是得到的返回值 + + // 确保 拿到result 并输出 + System.out.println("异步计算结果为:"+result); + + System.out.println("使用时间:"+ (System.currentTimeMillis()-start) + " ms"); + + // 然后退出main线程 + } + + private static int sum() { + return fibo(36); + } + + private static int fibo(int a) { + if ( a < 2) + return 1; + return fibo(a-1) + fibo(a-2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/FutureTaskExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/FutureTaskExample.java new file mode 100644 index 00000000..ede0f658 --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/FutureTaskExample.java @@ -0,0 +1,36 @@ +package java0.conc0303.homework3.byFuture; + + + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +/** + * @author tangtian + * @version 1.0 + * @className FutureExample + * @description + * @date 2020/11/9 8:48 AM + **/ +public class FutureTaskExample { + public static void main(String[] args) throws ExecutionException, InterruptedException { + FutureTask task = new FutureTask(new Callable() { + @Override + public Integer call() throws Exception { + return sum(); + } + }); + new Thread(task).start(); + System.out.println(task.get()); + } + private static int sum(){ + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/byThreadpool/FutureTaskExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/byThreadpool/FutureTaskExample.java new file mode 100644 index 00000000..74e741ae --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byFuture/byThreadpool/FutureTaskExample.java @@ -0,0 +1,44 @@ +package java0.conc0303.homework3.byFuture.byThreadpool; + + + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * @author tangtian + * @version 1.0 + * @className FutureExample + * @description + * @date 2020/11/9 8:48 AM + **/ +public class FutureTaskExample { + public static void main(String[] args) throws ExecutionException, InterruptedException { + ExecutorService executor = Executors.newCachedThreadPool(); + Future result = executor.submit(new Callable() { + public Integer call() throws Exception { + return sum(); + } + }); + executor.shutdown(); + try { + System.out.println("result:" + result.get()); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + private static int sum(){ + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CountDownLatchExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CountDownLatchExample.java new file mode 100644 index 00000000..df87e311 --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CountDownLatchExample.java @@ -0,0 +1,39 @@ +package java0.conc0303.homework3.byconcurrentTool; + + +import java.util.concurrent.CountDownLatch; + +/** + * @author tangtian + * @version 1.0 + * @className text1 + * @description countDownLatch + * @date 2020/11/9 6:26 AM + **/ +public class CountDownLatchExample { + private final static int threadCount = 20; + public static void main(String[] args) throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(threadCount); + long start=System.currentTimeMillis(); + for (int i = 0; i< threadCount; i++){ + new Thread(new Runnable() { + @Override + public void run() { + System.out.println(sum());//开始执行任务 + countDownLatch.countDown(); + } + }).start(); + } + countDownLatch.await(); + System.out.println("使用时间:"+ (System.currentTimeMillis()-start) + " ms"); + } + private static int sum(){ + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CyclicBarrierExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CyclicBarrierExample.java new file mode 100644 index 00000000..33319194 --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/CyclicBarrierExample.java @@ -0,0 +1,48 @@ +package java0.conc0303.homework3.byconcurrentTool; + + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +/** + * @author tangtian + * @version 1.0 + * @className text1 + * @description CyclicBarrier + * @date 2020/11/9 6:26 AM + **/ +public class CyclicBarrierExample { + private static CyclicBarrier barrier=new CyclicBarrier(2,()-> System.out.println("currentResult")); + public static void main(String[] args) throws InterruptedException { + for (int i = 0; i< 10; i++){ + new Thread(new Runnable() { + @Override + public void run() { + try { + //等待线程准备好 + System.out.println(Thread.currentThread().getName() + ":ready"); + //开始执行任务 + System.out.println(Thread.currentThread().getName() + "--result:" + sum()); + //等待处理结果 + barrier.await(); + System.out.println("continue"); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + }).start(); + } + } + private static int sum() throws BrokenBarrierException, InterruptedException { + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/SemaphoreExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/SemaphoreExample.java new file mode 100644 index 00000000..b1c1849d --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/SemaphoreExample.java @@ -0,0 +1,40 @@ +package java0.conc0303.homework3.byconcurrentTool; + + +import java.util.concurrent.Semaphore; + +/** + * @author tangtian + * @version 1.0 + * @className Semaphore + * @description + * @date 2020/11/9 8:27 AM + **/ +public class SemaphoreExample { + private final static int threadCount = 20; + + public static void main(String[] args) { + final Semaphore semaphore = new Semaphore(10); + for (int i = 0; i < threadCount; i++){ + new Thread(() -> { + try { + semaphore.acquire();//获取许可 + System.out.println(sum());//开始执行任务 + semaphore.release();//释放个许可 + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + } + + } + private static int sum(){ + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CountDownLatchExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CountDownLatchExample.java new file mode 100644 index 00000000..46d6300c --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CountDownLatchExample.java @@ -0,0 +1,49 @@ +package java0.conc0303.homework3.byconcurrentTool.byThreadpool; + + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author tangtian + * @version 1.0 + * @className text1 + * @description countDownLatch + * @date 2020/11/9 6:26 AM + **/ +public class CountDownLatchExample { + private final static int threadCount = 200; + + public static void main(String[] args) throws Exception { + + ExecutorService exec = Executors.newCachedThreadPool(); + + final CountDownLatch countDownLatch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + final int threadNum = i; + exec.execute(() -> { + try { + sum(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + countDownLatch.countDown(); + } + }); + } + countDownLatch.await(); + System.out.println("程序执行完毕"); + exec.shutdown(); + } + private static int sum(){ + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CyclicBarrierExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CyclicBarrierExample.java new file mode 100644 index 00000000..ba1fb6e9 --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/CyclicBarrierExample.java @@ -0,0 +1,48 @@ +package java0.conc0303.homework3.byconcurrentTool.byThreadpool; + + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +/** + * @author tangtian + * @version 1.0 + * @className text1 + * @description CyclicBarrier + * @date 2020/11/9 6:26 AM + **/ +public class CyclicBarrierExample { + private static CyclicBarrier barrier=new CyclicBarrier(2,()-> System.out.println("currentResult")); + public static void main(String[] args) throws InterruptedException { + for (int i = 0; i< 10; i++){ + new Thread(new Runnable() { + @Override + public void run() { + try { + //等待线程准备好 + System.out.println(Thread.currentThread().getName() + ":ready"); + //开始执行任务 + System.out.println(Thread.currentThread().getName() + "--result:" + sum()); + //等待处理结果 + barrier.await(); + System.out.println("continue"); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + }).start(); + } + } + private static int sum() throws BrokenBarrierException, InterruptedException { + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/SemaphoreExample.java b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/SemaphoreExample.java new file mode 100644 index 00000000..e6da66f9 --- /dev/null +++ b/03concurrency/0301/src/main/java/java0/conc0303/homework3/byconcurrentTool/byThreadpool/SemaphoreExample.java @@ -0,0 +1,47 @@ +package java0.conc0303.homework3.byconcurrentTool.byThreadpool; + + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + +/** + * @author tangtian + * @version 1.0 + * @className Semaphore + * @description + * @date 2020/11/9 8:27 AM + **/ +public class SemaphoreExample { + private final static int threadCount = 20; + + public static void main(String[] args) throws Exception { + + ExecutorService exec = Executors.newCachedThreadPool(); + + final Semaphore semaphore = new Semaphore(3); + + for (int i = 0; i < threadCount; i++) { + final int threadNum = i; + exec.execute(() -> { + try { + semaphore.acquire(3); // 获取全部许可,退化成串行执行 + sum(); + semaphore.release(3); // 释放多个许可 + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + exec.shutdown(); + } + private static int sum(){ + return fibo(39); + } + private static int fibo(int a){ + if (a < 2){ + return 1; + } + return fibo(a-1) + fibo(a - 2); + } +} diff --git a/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo.java b/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo.java index f811c626..697c443a 100644 --- a/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo.java +++ b/03concurrency/0301/src/main/java/java0/conc0303/tool/SemaphoreDemo.java @@ -1,14 +1,14 @@ package java0.conc0303.tool; import java.util.concurrent.Semaphore; +import java.util.stream.IntStream; public class SemaphoreDemo { public static void main(String[] args) { int N = 8; //工人数 - Semaphore semaphore = new Semaphore(5); //机器数目 - for (int i = 0; i < N; i++) - new Worker(i, semaphore).start(); + Semaphore semaphore = new Semaphore(3); //机器数目 + IntStream.range(0, N).forEach(i -> new Worker(i, semaphore).start()); } static class Worker extends Thread { diff --git a/out/production/JavaCourseCodes/META-INF/JavaCourseCodes.kotlin_module b/out/production/JavaCourseCodes/META-INF/JavaCourseCodes.kotlin_module new file mode 100644 index 0000000000000000000000000000000000000000..a49347afef10a9b5f95305e1058ba36adec7d6dd GIT binary patch literal 16 RcmZQzU|?ooU|@t|0RRA102TlM literal 0 HcmV?d00001 diff --git a/out/production/JavaCourseCodes/META-INF/MANIFEST.MF b/out/production/JavaCourseCodes/META-INF/MANIFEST.MF new file mode 100644 index 00000000..ef62bb7b --- /dev/null +++ b/out/production/JavaCourseCodes/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: + diff --git "a/out/production/JavaCourseCodes/\347\216\257\345\242\203\345\207\206\345\244\207.txt" "b/out/production/JavaCourseCodes/\347\216\257\345\242\203\345\207\206\345\244\207.txt" new file mode 100644 index 00000000..a1458b85 --- /dev/null +++ "b/out/production/JavaCourseCodes/\347\216\257\345\242\203\345\207\206\345\244\207.txt" @@ -0,0 +1,38 @@ + + +## Windows + +1.管理员身份打开powershell + +2.运行 +Set-ExecutionPolicy Bypass -Scope Process -Force; [System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072; iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1')) + +3.执行choco install superbenchmarker + +4.输入 sb + +执行 sb -u http://localhost:8088/api/hello -c 20 -N 60 + +## Mac + +1.执行brew install wrk +如果显式brew update很慢,可以ctrl+C打断更新 + +2.输入 wrk + +执行 wrk -t8 -c40 -d60s http://localhost:8088/api/hello + +## 压测程序 + +1.可以从github获取 +git clone https://github.com/kimmking/atlantis +cd atlantis\gateway-server +mvn clean package +然后在target目录可以找到gateway-server-0.0.1-SNAPSHOT.jar + +2.也可以从此处下载已经编译好的: +链接:https://pan.baidu.com/s/1NbpYX4M3YKLYM1JJeIzgSQ +提取码:sp85 + +java -jar -Xmx512m -Xms512 gateway-server-0.0.1-SNAPSHOT.jar + From 6186f941ed1417f7e4334a375a7a6ff413227250 Mon Sep 17 00:00:00 2001 From: tangtian8 <765502922@qq.com> Date: Tue, 5 Jan 2021 07:47:08 +0800 Subject: [PATCH 2/3] 1 --- 07rpc/rpc01/rpcfx-core/pom.xml | 10 ++- .../rpcfx/annotation/RpcfxService.java | 17 ++++ .../io/kimmking/rpcfx/api/RpcfxResolver.java | 2 + .../java/io/kimmking/rpcfx/client/Rpcfx.java | 52 ++++++++----- .../rpcfx/exception/RpcException.java | 78 +++++++++++++++++++ .../kimmking/rpcfx/server/RpcfxInvoker.java | 22 +++--- .../io/kimmking/rpcfx/utils/HttpUtils.java | 68 ++++++++++++++++ .../demo/consumer/RpcfxClientApplication.java | 4 +- 07rpc/rpc01/rpcfx-demo-provider/pom.xml | 7 +- .../rpcfx/demo/provider/DemoResolver.java | 28 +++++-- .../provider/HttpMessageConverterConfig.java | 37 +++++++++ .../demo/provider/RpcfxServerApplication.java | 24 +++--- 12 files changed, 296 insertions(+), 53 deletions(-) create mode 100644 07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java create mode 100644 07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/exception/RpcException.java create mode 100644 07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/HttpUtils.java create mode 100644 07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/HttpMessageConverterConfig.java diff --git a/07rpc/rpc01/rpcfx-core/pom.xml b/07rpc/rpc01/rpcfx-core/pom.xml index 8f11dc5d..f7fc2e5e 100644 --- a/07rpc/rpc01/rpcfx-core/pom.xml +++ b/07rpc/rpc01/rpcfx-core/pom.xml @@ -23,7 +23,15 @@ fastjson 1.2.70 - + + org.apache.httpcomponents + httpclient + 4.5.2 + + + net.bytebuddy + byte-buddy + com.squareup.okhttp3 okhttp diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java new file mode 100644 index 00000000..d6add928 --- /dev/null +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/annotation/RpcfxService.java @@ -0,0 +1,17 @@ +package io.kimmking.rpcfx.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Inherited +public @interface RpcfxService { + Class interfaceClass() default void.class; + String interfaceName() default ""; +} diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java index f7c48068..df40e78b 100644 --- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java @@ -4,4 +4,6 @@ public interface RpcfxResolver { Object resolve(String serviceClass); + void initService(String servicePath); + } diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java index 3d7b3788..57b14570 100644 --- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java @@ -3,17 +3,23 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.parser.ParserConfig; +import io.kimmking.rpcfx.utils.HttpUtils; +import io.kimmking.rpcfx.exception.RpcException; import io.kimmking.rpcfx.api.RpcfxRequest; import io.kimmking.rpcfx.api.RpcfxResponse; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.implementation.InvocationHandlerAdapter; +import net.bytebuddy.matcher.ElementMatchers; import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; +import okhttp3.internal.http2.ErrorCode; import java.io.IOException; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Map; public final class Rpcfx { @@ -21,11 +27,25 @@ public final class Rpcfx { ParserConfig.getGlobalInstance().addAccept("io.kimmking"); } - public static T create(final Class serviceClass, final String url) { - - // 0. 替换动态代理 -> AOP - return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass}, new RpcfxInvocationHandler(serviceClass, url)); + public static T create(final Class serviceClass, final String url) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + // 0. 替换动态代理 -> 字节码增强 + return serviceClass.cast(getByteBuddyProxy(serviceClass, url)); + } + private static Object getByteBuddyProxy(Class serviceClass, String url) + throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + return new ByteBuddy() + .subclass(Object.class) + .name(serviceClass.getCanonicalName() + "$ByteBuddyProxy") + .implement(serviceClass) + .method(ElementMatchers.any()) + .intercept(InvocationHandlerAdapter.of(new RpcfxInvocationHandler(serviceClass, url))) + .make() + .load(Rpcfx.class.getClassLoader()) + .getLoaded() + .getDeclaredConstructor() + .newInstance(); } public static class RpcfxInvocationHandler implements InvocationHandler { @@ -39,10 +59,9 @@ public RpcfxInvocationHandler(Class serviceClass, String url) { this.url = url; } - // 可以尝试,自己去写对象序列化,二进制还是文本的,,,rpcfx是xml自定义序列化、反序列化,json: code.google.com/p/rpcfx - // int byte char float double long bool + // TODO: 2020/12/17 可以尝试,自己去写对象序列化,二进制还是文本的,,,rpcfx是xml自定义序列化、反序列化,json: code.google.com/p/rpcfx + // int byte char float double long bool // [], data class - @Override public Object invoke(Object proxy, Method method, Object[] params) throws Throwable { RpcfxRequest request = new RpcfxRequest(); @@ -51,10 +70,12 @@ public Object invoke(Object proxy, Method method, Object[] params) throws Throwa request.setParams(params); RpcfxResponse response = post(request, url); - // 这里判断response.status,处理异常 // 考虑封装一个全局的RpcfxException - + if(!response.isStatus()){ + System.out.println("error" + response); + throw new RpcException(ErrorCode.CONNECT_ERROR); + } return JSON.parse(response.getResult().toString()); } @@ -64,12 +85,7 @@ private RpcfxResponse post(RpcfxRequest req, String url) throws IOException { // 1.可以复用client // 2.尝试使用httpclient或者netty client - OkHttpClient client = new OkHttpClient(); - final Request request = new Request.Builder() - .url(url) - .post(RequestBody.create(JSONTYPE, reqJson)) - .build(); - String respJson = client.newCall(request).execute().body().string(); + String respJson = HttpUtils.httpPostJson(reqJson,url); System.out.println("resp json: "+respJson); return JSON.parseObject(respJson, RpcfxResponse.class); } diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/exception/RpcException.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/exception/RpcException.java new file mode 100644 index 00000000..62db1452 --- /dev/null +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/exception/RpcException.java @@ -0,0 +1,78 @@ +package io.kimmking.rpcfx.exception; + +import okhttp3.internal.http2.ErrorCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static okhttp3.internal.http2.ErrorCode.PROTOCOL_ERROR; + +/** + * @author tangtian + * @version 1.0 + * @className RpcException + * @description + * @date 2020/12/16 8:21 PM + **/ +public class RpcException extends RuntimeException { + private static final Logger LOG = LoggerFactory.getLogger(RpcException.class); + private static final long serialVersionUID = -7864604160297181941L; + + /** 错误码 */ + protected final ErrorCode errorCode; +// NO_ERROR(0), +// PROTOCOL_ERROR(1), +// INTERNAL_ERROR(2), +// FLOW_CONTROL_ERROR(3), +// REFUSED_STREAM(7), +// CANCEL(8), +// COMPRESSION_ERROR(9), +// CONNECT_ERROR(10), +// ENHANCE_YOUR_CALM(11), +// INADEQUATE_SECURITY(12), +// HTTP_1_1_REQUIRED(13); + + /** + * 这个是和谐一些不必要的地方,冗余的字段 + * 尽量不要用 + */ + private String code; + + /** + * 无参默认构造UNSPECIFIED + */ + public RpcException() { + super(""); + this.errorCode = PROTOCOL_ERROR; + } + + /** + * 指定错误码构造通用异常 + * @param errorCode 错误码 + */ + public RpcException(final ErrorCode errorCode) { + super(""); + this.errorCode = errorCode; + } + + /** + * 构造通用异常 + * @param errorCode 错误码 + * @param detailedMessage 详细描述 + * @param t 导火索 + */ + public RpcException(final ErrorCode errorCode, final String detailedMessage, + final Throwable t) { + super(detailedMessage, t); + this.errorCode = errorCode; + } + + /** + * Getter method for property errorCode. + * + * @return property value of errorCode + */ + public ErrorCode getErrorCode() { + return errorCode; + } + +} diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java index a6f77dac..54bffdef 100644 --- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java @@ -2,6 +2,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; +import io.kimmking.rpcfx.exception.RpcException; import io.kimmking.rpcfx.api.RpcfxRequest; import io.kimmking.rpcfx.api.RpcfxResolver; import io.kimmking.rpcfx.api.RpcfxResponse; @@ -13,9 +14,10 @@ public class RpcfxInvoker { private RpcfxResolver resolver; - - public RpcfxInvoker(RpcfxResolver resolver){ + public RpcfxInvoker(RpcfxResolver resolver, String servicePath){ this.resolver = resolver; + // 初始化服务 + resolver.initService(servicePath); } public RpcfxResponse invoke(RpcfxRequest request) { @@ -24,25 +26,21 @@ public RpcfxResponse invoke(RpcfxRequest request) { // 作业1:改成泛型和反射 Object service = resolver.resolve(serviceClass);//this.applicationContext.getBean(serviceClass); - try { Method method = resolveMethodFromClass(service.getClass(), request.getMethod()); - Object result = method.invoke(service, request.getParams()); // dubbo, fastjson, - // 两次json序列化能否合并成一个 - response.setResult(JSON.toJSONString(result, SerializerFeature.WriteClassName)); + Object result = method.invoke(service, request.getParams()); + // 两次json序列化合并成一个 + response.setResult(result); response.setStatus(true); - return response; - } catch ( IllegalAccessException | InvocationTargetException e) { - + } catch (IllegalAccessException | InvocationTargetException e) { // 3.Xstream - // 2.封装一个统一的RpcfxException // 客户端也需要判断异常 e.printStackTrace(); - response.setException(e); + response.setException(new RpcException()); response.setStatus(false); - return response; } + return response; } private Method resolveMethodFromClass(Class klass, String methodName) { diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/HttpUtils.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/HttpUtils.java new file mode 100644 index 00000000..85dcdb84 --- /dev/null +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/utils/HttpUtils.java @@ -0,0 +1,68 @@ +package io.kimmking.rpcfx.utils; + +import org.apache.http.NameValuePair; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author tangtian + * @version 1.0 + * @className HttpUtils + * @description + * @date 2020/12/16 8:06 PM + **/ +public class HttpUtils { + private final static CloseableHttpClient httpClient; + private final static int CONNECT_TIMEOUT = 5000; + private final static RequestConfig requestConfig; + + static { + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(500); + connectionManager.setDefaultMaxPerRoute(50); + requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(CONNECT_TIMEOUT) + .setConnectTimeout(CONNECT_TIMEOUT) + .setSocketTimeout(CONNECT_TIMEOUT).build(); + + httpClient = HttpClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) + .build(); + } + + public static String httpPostJson(final String json, final String url) { + final HttpPost httpPost = new HttpPost(url); + httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); + httpPost.setHeader(HTTP.CONTENT_TYPE, "application/json;charset=utf-8"); + StringEntity requestEntity = new StringEntity(json, StandardCharsets.UTF_8); + httpPost.setEntity(requestEntity); + try { + CloseableHttpResponse response = httpClient.execute(httpPost); + return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException("httpclient请求失败!"); + } + } +} diff --git a/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java b/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java index 17537bb9..11e4683e 100644 --- a/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java +++ b/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java @@ -7,6 +7,8 @@ import io.kimmking.rpcfx.demo.api.UserService; import org.springframework.boot.autoconfigure.SpringBootApplication; +import java.lang.reflect.InvocationTargetException; + @SpringBootApplication public class RpcfxClientApplication { @@ -15,7 +17,7 @@ public class RpcfxClientApplication { // nexus, userserivce -> userdao -> user // - public static void main(String[] args) { + public static void main(String[] args) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { // UserService service = new xxx(); // service.findById diff --git a/07rpc/rpc01/rpcfx-demo-provider/pom.xml b/07rpc/rpc01/rpcfx-demo-provider/pom.xml index c0f9b5c9..14b7715d 100644 --- a/07rpc/rpc01/rpcfx-demo-provider/pom.xml +++ b/07rpc/rpc01/rpcfx-demo-provider/pom.xml @@ -37,7 +37,12 @@ org.springframework.boot spring-boot-starter-web - + + + org.reflections + reflections + 0.9.12 + org.springframework.boot spring-boot-starter-test diff --git a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java index 65cffbd2..1625bbb7 100644 --- a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java +++ b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java @@ -1,20 +1,32 @@ package io.kimmking.rpcfx.demo.provider; +import io.kimmking.rpcfx.annotation.RpcfxService; import io.kimmking.rpcfx.api.RpcfxResolver; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; +import org.reflections.Reflections; -public class DemoResolver implements RpcfxResolver, ApplicationContextAware { +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; - private ApplicationContext applicationContext; +public class DemoResolver implements RpcfxResolver { + + private final static Map SERVICE_CACHE = new ConcurrentHashMap<>(); @Override - public void setApplicationContext(ApplicationContext applicationContext) { - this.applicationContext = applicationContext; + public Object resolve(String serviceClass) { + return SERVICE_CACHE.get(serviceClass); } @Override - public Object resolve(String serviceClass) { - return this.applicationContext.getBean(serviceClass); + public void initService(String servicePath) { + Reflections reflections = new Reflections(servicePath); + Set> classes = reflections.getTypesAnnotatedWith(RpcfxService.class); + classes.forEach(c -> { + try { + SERVICE_CACHE.put(c.getInterfaces()[0].getName(), c.newInstance()); + } catch (InstantiationException | IllegalAccessException e) { + e.printStackTrace(); + } + }); } } diff --git a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/HttpMessageConverterConfig.java b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/HttpMessageConverterConfig.java new file mode 100644 index 00000000..51ac44b4 --- /dev/null +++ b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/HttpMessageConverterConfig.java @@ -0,0 +1,37 @@ +package io.kimmking.rpcfx.demo.provider; + +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson.support.config.FastJsonConfig; +import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.http.HttpMessageConverters; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.nio.charset.StandardCharsets; + +/** + * @author tangtian + * @version 1.0 + * @className HttpMessageConverterConfig + * @description + * @date 2020/12/17 7:23 AM + **/ +@Configuration +public class HttpMessageConverterConfig { + private static final Logger LOG = LoggerFactory.getLogger(HttpMessageConverterConfig.class); + @Bean + public HttpMessageConverters fastJsonHttpMessageConverters() { + FastJsonHttpMessageConverter fastConverter = new FastJsonHttpMessageConverter(); + FastJsonConfig fastJsonConfig = new FastJsonConfig(); + SerializerFeature[] serializerFeatures = new SerializerFeature[] { + SerializerFeature.DisableCircularReferenceDetect, + SerializerFeature.WriteClassName + }; + fastJsonConfig.setSerializerFeatures(serializerFeatures); + fastJsonConfig.setCharset(StandardCharsets.UTF_8); + fastConverter.setFastJsonConfig(fastJsonConfig); + return new HttpMessageConverters(fastConverter); + } +} diff --git a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java index 266618de..06447d3b 100644 --- a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java +++ b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java @@ -32,7 +32,7 @@ public RpcfxResponse invoke(@RequestBody RpcfxRequest request) { @Bean public RpcfxInvoker createInvoker(@Autowired RpcfxResolver resolver){ - return new RpcfxInvoker(resolver); + return new RpcfxInvoker(resolver,"io.kimmking.rpcfx.demo.provider"); } @Bean @@ -40,16 +40,16 @@ public RpcfxResolver createResolver(){ return new DemoResolver(); } - // 能否去掉name - // - @Bean(name = "io.kimmking.rpcfx.demo.api.UserService") - public UserService createUserService(){ - return new UserServiceImpl(); - } - - @Bean(name = "io.kimmking.rpcfx.demo.api.OrderService") - public OrderService createOrderService(){ - return new OrderServiceImpl(); - } +// // 能否去掉name +// // +// @Bean(name = "io.kimmking.rpcfx.demo.api.UserService") +// public UserService createUserService(){ +// return new UserServiceImpl(); +// } +// +// @Bean(name = "io.kimmking.rpcfx.demo.api.OrderService") +// public OrderService createOrderService(){ +// return new OrderServiceImpl(); +// } } From 9eadd62bd6cb6089db5a9c647a6b9cfe9928577b Mon Sep 17 00:00:00 2001 From: tangtian8 <765502922@qq.com> Date: Fri, 15 Jan 2021 08:59:59 +0800 Subject: [PATCH 3/3] update_20210115 --- 09mq/pom.xml | 2 +- .../mq/activemq/ActivemqApplication.java | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/09mq/pom.xml b/09mq/pom.xml index dd3f8b9b..b6d1b269 100644 --- a/09mq/pom.xml +++ b/09mq/pom.xml @@ -27,7 +27,7 @@ org.apache.activemq activemq-all - 5.16.0 + 5.15.6 diff --git a/09mq/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java b/09mq/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java index 75e13283..f6b64332 100644 --- a/09mq/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java +++ b/09mq/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java @@ -7,7 +7,13 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import javax.jms.*; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import java.util.concurrent.atomic.AtomicInteger; @@ -18,7 +24,7 @@ public static void main(String[] args) { // 定义Destination Destination destination = new ActiveMQTopic("test.topic"); - // Destination destination = new ActiveMQQueue("test.queue"); + // Destination destination = new ActiveMQQueue("test.queue"); testDestination(destination); @@ -28,7 +34,7 @@ public static void main(String[] args) { public static void testDestination(Destination destination) { try { // 创建连接和会话 - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://120.79.218.62:61616"); ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -37,6 +43,7 @@ public static void testDestination(Destination destination) { MessageConsumer consumer = session.createConsumer( destination ); final AtomicInteger count = new AtomicInteger(0); MessageListener listener = new MessageListener() { + @Override public void onMessage(Message message) { try { // 打印所有的消息内容 @@ -62,9 +69,9 @@ public void onMessage(Message message) { producer.send(message); } - Thread.sleep(2000); - session.close(); - conn.close(); +// Thread.sleep(2000); +// session.close(); +// conn.close(); } catch (Exception e) {