diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/HttpClientHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/HttpClientHandler.java new file mode 100644 index 00000000..694ad881 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/HttpClientHandler.java @@ -0,0 +1,34 @@ +package io.github.kimmking.gateway.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.*; +import io.netty.util.CharsetUtil; + +import java.net.URI; + +public class HttpClientHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + URI uri = new URI("/user/get"); + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, uri.toASCIIString()); + request.headers().add(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE); + request.headers().add(HttpHeaderNames.CONTENT_LENGTH,request.content().readableBytes()); + ctx.writeAndFlush(request); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("msg ->" + msg); + + if (msg instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) msg; + ByteBuf buf = response.content(); + String result = buf.toString(CharsetUtil.UTF_8); + System.out.println("response -> " + result); + + } + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/HttpClientTest.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/HttpClientTest.java new file mode 100644 index 00000000..45e9dce3 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/HttpClientTest.java @@ -0,0 +1,57 @@ +package io.github.kimmking.gateway.client; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.util.Objects; + +public class HttpClientTest { + + public static void main(String[] args) throws IOException { + + for (int i = 0; i < 10; i++) { + httpGet(); + } + + } + + private static void httpGet() throws IOException { + + CloseableHttpResponse response = null; + HttpGet get = null; + CloseableHttpClient client = null; + + try { + + client = HttpClients.createDefault(); + + get = new HttpGet("http://localhost:8801"); + response = client.execute(get); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + HttpEntity entity = response.getEntity(); + String html = EntityUtils.toString(entity, "utf-8"); + System.out.println(html); + } + } catch (Exception e){ + e.printStackTrace(); + } + finally { + if (Objects.nonNull(response)) { + response.close(); + } + + if (Objects.nonNull(get)) { + get.releaseConnection(); + } + + client.close(); + } + } + +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/NettyClientTest.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/NettyClientTest.java new file mode 100644 index 00000000..f79ea62c --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/client/NettyClientTest.java @@ -0,0 +1,49 @@ +package io.github.kimmking.gateway.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; + +public class NettyClientTest { + + public static void start(String host,int port){ + + EventLoopGroup group = new NioEventLoopGroup(); + + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE,true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new HttpClientCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(65536)); + ch.pipeline().addLast(new HttpContentDecompressor()); + ch.pipeline().addLast(new HttpClientHandler()); + } + }); + + ChannelFuture future = b.connect(host, port).sync(); + + ChannelFuture channelFuture = future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + System.out.println("关闭线程组"); + group.shutdownGracefully(); + } + + } + + public static void main(String[] args) { + start("localhost",8801); + } + +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/MyHttpRequestFilter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/MyHttpRequestFilter.java new file mode 100644 index 00000000..419e6abd --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/MyHttpRequestFilter.java @@ -0,0 +1,16 @@ +package io.github.kimmking.gateway.filter; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; + +/** + * 自己实现的filter + */ +public class MyHttpRequestFilter implements HttpRequestFilter { + + @Override + public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) { + System.out.println("header content set in the filter"); + fullRequest.headers().set("xjava","Paulguard"); + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java index 69b40fde..4eb9ef2a 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java @@ -1,23 +1,19 @@ package io.github.kimmking.gateway.inbound; -import io.github.kimmking.gateway.filter.HeaderHttpRequestFilter; -import io.github.kimmking.gateway.filter.HttpRequestFilter; +import io.github.kimmking.gateway.filter.MyHttpRequestFilter; import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.util.ReferenceCountUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; public class HttpInboundHandler extends ChannelInboundHandlerAdapter { - private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class); private final List proxyServer; private HttpOutboundHandler handler; - private HttpRequestFilter filter = new HeaderHttpRequestFilter(); + private MyHttpRequestFilter filter = new MyHttpRequestFilter(); public HttpInboundHandler(List proxyServer) { this.proxyServer = proxyServer; diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java index 97d54ccd..6193781a 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java @@ -53,6 +53,7 @@ public void run() throws Exception { System.out.println("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/'); ch.closeFuture().sync(); } finally { + System.out.println("关闭线程组"); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java index c20c9be5..c60564e3 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/httpclient4/HttpOutboundHandler.java @@ -4,8 +4,8 @@ import io.github.kimmking.gateway.filter.HeaderHttpResponseFilter; import io.github.kimmking.gateway.filter.HttpRequestFilter; import io.github.kimmking.gateway.filter.HttpResponseFilter; +import io.github.kimmking.gateway.router.WeightHttpEndPointRouter; import io.github.kimmking.gateway.router.HttpEndpointRouter; -import io.github.kimmking.gateway.router.RandomHttpEndpointRouter; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -22,10 +22,9 @@ import org.apache.http.protocol.HTTP; import org.apache.http.util.EntityUtils; +import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.concurrent.*; -import java.util.logging.Filter; import java.util.stream.Collectors; import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; @@ -39,7 +38,7 @@ public class HttpOutboundHandler { private List backendUrls; HttpResponseFilter filter = new HeaderHttpResponseFilter(); - HttpEndpointRouter router = new RandomHttpEndpointRouter(); + HttpEndpointRouter router = new WeightHttpEndPointRouter(); public HttpOutboundHandler(List backends) { @@ -73,9 +72,18 @@ private String formatUrl(String backend) { } public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, HttpRequestFilter filter) { - String backendUrl = router.route(this.backendUrls); + + List weights = new ArrayList<>(); + weights.add(70); + weights.add(30); + + String backendUrl = router.route(this.backendUrls,weights); final String url = backendUrl + fullRequest.uri(); + + System.out.println("被访问的url:"+url); + filter.filter(fullRequest, ctx); + proxyService.submit(()->fetchGet(fullRequest, ctx, url)); } @@ -84,6 +92,9 @@ private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext //httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE); httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); httpGet.setHeader("mao", inbound.headers().get("mao")); + httpGet.setHeader("javaParam",inbound.headers().get("xjava")); + + System.out.println("xjava:" + inbound.headers().get("xjava")); httpclient.execute(httpGet, new FutureCallback() { @Override diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/HttpEndpointRouter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/HttpEndpointRouter.java index 8e307ab7..fc3a0a51 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/HttpEndpointRouter.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/HttpEndpointRouter.java @@ -5,6 +5,8 @@ public interface HttpEndpointRouter { String route(List endpoints); + + String route(List endpoints,List weight); // Load Balance // Random diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandomHttpEndpointRouter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandomHttpEndpointRouter.java index 684d1ba5..7352eae7 100644 --- a/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandomHttpEndpointRouter.java +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/RandomHttpEndpointRouter.java @@ -10,4 +10,9 @@ public String route(List urls) { Random random = new Random(System.currentTimeMillis()); return urls.get(random.nextInt(size)); } + + @Override + public String route(List endpoints, List weight) { + throw new RuntimeException("Unsupported Method"); + } } diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/WeightHttpEndPointRouter.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/WeightHttpEndPointRouter.java new file mode 100644 index 00000000..517b6465 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/router/WeightHttpEndPointRouter.java @@ -0,0 +1,43 @@ +package io.github.kimmking.gateway.router; + +import java.util.List; +import java.util.Random; + +public class WeightHttpEndPointRouter implements HttpEndpointRouter { + + @Override + public String route(List endpoints) { + throw new RuntimeException("Unsupported Method"); + } + + @Override + public String route(List endpoints, List weight) { + + Random random = new Random(System.currentTimeMillis()); + int randomInt = random.nextInt(100); + + for (int i = 0; i < weight.size(); i++) { + + if (i == weight.size() - 1) { + return endpoints.get(i); + } + + Integer curWeight = weight.get(i); + if (randomInt > curWeight) { + continue; + } + + return endpoints.get(i); + } + + return endpoints.get(0); + } + + public static void main(String[] args) { + Random random = new Random(System.currentTimeMillis()); + System.out.println(random.nextInt(100)); + System.out.println(random.nextInt(100)); + System.out.println(random.nextInt(100)); + System.out.println(random.nextInt(100)); + } +} diff --git a/02nio/nio02/src/main/java/io/github/kimmking/gateway/server/HttpServerTest.java b/02nio/nio02/src/main/java/io/github/kimmking/gateway/server/HttpServerTest.java new file mode 100644 index 00000000..753c8813 --- /dev/null +++ b/02nio/nio02/src/main/java/io/github/kimmking/gateway/server/HttpServerTest.java @@ -0,0 +1,60 @@ +package io.github.kimmking.gateway.server; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; + +public class HttpServerTest { + + static int port = 8801; + + public static void main(String[] args) throws IOException { + + if (args != null && args.length > 0) { + try { + port = Integer.valueOf(args[0]); + } catch (NumberFormatException e) { + e.printStackTrace(); + } + + } + + ServerSocket serverSocket = new ServerSocket(port); + + while (true){ + try { + Socket socket = serverSocket.accept(); + service(socket); + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + + private static void service(Socket socket) { + + try { + PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true); + printWriter.println("HTTP/1.1 200 OK"); + printWriter.println("Content-Type:text/html;charset=utf-8"); + String body = "hello,nio" + port; + printWriter.println("Content-Length:" + body.getBytes().length); + printWriter.println(); + + printWriter.write(body); + printWriter.close(); + + //这里主要是为了让httpClient可以在server的socket关闭前,拿到返回的结果 + Thread.sleep(10); + + socket.close(); + + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + + } + +} diff --git a/02nio/nio02/src/main/java/thread/github/pxd/TestDemonThread.java b/02nio/nio02/src/main/java/thread/github/pxd/TestDemonThread.java new file mode 100644 index 00000000..18137a5a --- /dev/null +++ b/02nio/nio02/src/main/java/thread/github/pxd/TestDemonThread.java @@ -0,0 +1,29 @@ +package thread.github.pxd; + +public class TestDemonThread { + + public static void main(String[] args) { + + Runnable task = () -> { + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Thread t = Thread.currentThread(); + System.out.println("当前线程:" + t.getName()); + + }; + + Thread thread = new Thread(task); + thread.setName("test-thread-1"); + + //这个地方如果是守护线程,那么进程里所有的线程都是守护线程,则会在进程结束的时候,会清理掉所有线程 + //thread.setDaemon(true); + thread.start(); + + } + +} diff --git a/02nio/nio02/src/main/java/thread/github/pxd/ThreadABCPrint.java b/02nio/nio02/src/main/java/thread/github/pxd/ThreadABCPrint.java new file mode 100644 index 00000000..1ef0e613 --- /dev/null +++ b/02nio/nio02/src/main/java/thread/github/pxd/ThreadABCPrint.java @@ -0,0 +1,44 @@ +package thread.github.pxd; + +public class ThreadABCPrint { + + volatile static int completeThread = 0; + + public static void main(String[] args) { + + Thread thread1 = new Thread(() -> { + while (true){ + + if (completeThread == 3 || completeThread == 0) { + System.out.print("A"); + completeThread = 1; + } + } + }); + + Thread thread2 = new Thread(() -> { + while (true){ + + if (completeThread == 1) { + System.out.print("B"); + completeThread = 2; + } + } + }); + + Thread thread3 = new Thread(() -> { + while (true){ + + if (completeThread == 2) { + System.out.print("C"); + completeThread = 3; + } + } + }); + + thread1.start(); + thread2.start(); + thread3.start(); + } + +} diff --git a/02nio/nio02/src/main/java/thread/github/pxd/ThreadJoinTest.java b/02nio/nio02/src/main/java/thread/github/pxd/ThreadJoinTest.java new file mode 100644 index 00000000..32a33707 --- /dev/null +++ b/02nio/nio02/src/main/java/thread/github/pxd/ThreadJoinTest.java @@ -0,0 +1,41 @@ +package thread.github.pxd; + +public class ThreadJoinTest { + + public static void main(String[] args) { + + Runnable task = () -> { + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Thread t = Thread.currentThread(); + System.out.println("当前线程:" + t.getName()); + + }; + + Thread thread1 = new Thread(task); + thread1.setName("test-thread-1"); + + Thread thread2 = new Thread(() -> { + try { + Thread t = Thread.currentThread(); + System.out.println("当前线程:" + t.getName()); + + thread1.join(); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + }); + thread2.setName("test-thread-2"); + + thread1.start(); + thread2.start(); + } + +} diff --git a/07rpc/rpc01/rpcfx-core/pom.xml b/07rpc/rpc01/rpcfx-core/pom.xml index 4570a59d..ef5434d3 100644 --- a/07rpc/rpc01/rpcfx-core/pom.xml +++ b/07rpc/rpc01/rpcfx-core/pom.xml @@ -70,6 +70,28 @@ + + + org.springframework + spring-aop + + + + org.aspectj + aspectjrt + + + + org.aspectj + aspectjweaver + + + + io.netty + netty-all + 4.1.45.Final + + diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java index f7c48068..d74909b2 100644 --- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/RpcfxResolver.java @@ -2,6 +2,6 @@ public interface RpcfxResolver { - Object resolve(String serviceClass); + T resolve(String serviceClass,Class type); } diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/HttpClientHandler.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/HttpClientHandler.java new file mode 100644 index 00000000..40d45e5c --- /dev/null +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/HttpClientHandler.java @@ -0,0 +1,60 @@ +package io.kimmking.rpcfx.client; + +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import com.alibaba.fastjson.JSON; + +import io.kimmking.rpcfx.api.RpcfxRequest; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; + +public class HttpClientHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + + URI uri = new URI("/"); + + RpcfxRequest parameters = new RpcfxRequest(); + parameters.setServiceClass("io.kimmking.rpcfx.demo.api.UserService"); + parameters.setMethod("findById"); + + Object[] params = new Object[1]; + params[0] = 1; + parameters.setParams(params); + + String content = JSON.toJSONString(parameters); + + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.POST, uri.toASCIIString(), + Unpooled.wrappedBuffer(content.getBytes(StandardCharsets.UTF_8))); + request.headers().add(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE); + request.headers().add(HttpHeaderNames.CONTENT_LENGTH,request.content().readableBytes()); + request.headers().add(HttpHeaderNames.CONTENT_TYPE,"application/json; charset=utf-8"); + + ctx.writeAndFlush(request); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("msg ->" + msg); + + if (msg instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) msg; + ByteBuf buf = response.content(); + String result = buf.toString(CharsetUtil.UTF_8); + System.out.println("response -> " + result); + + } + } +} diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/NettyClient.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/NettyClient.java new file mode 100644 index 00000000..2d630b38 --- /dev/null +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/NettyClient.java @@ -0,0 +1,54 @@ +package io.kimmking.rpcfx.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; + +public class NettyClient { + + public static String start(String host, int port){ + + EventLoopGroup group = new NioEventLoopGroup(); + ChannelFuture future = null; + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE,true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new HttpClientCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(65536)); + ch.pipeline().addLast(new HttpContentDecompressor()); + ch.pipeline().addLast(new HttpClientHandler()); + } + }); + + future = b.connect(host, port).sync(); + ChannelFuture channelFuture = future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + System.out.println("关闭线程组"); + group.shutdownGracefully(); + } + + return ""; + } + + public static void main(String[] args) { + String result = start("localhost", 8080); + System.out.println("the result:"+result); + } + +} diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java index 5d1ae517..fa5c0958 100644 --- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java @@ -94,20 +94,19 @@ public Object invoke(Object proxy, Method method, Object[] params) throws Throwa return JSON.parse(response.getResult().toString()); } - private RpcfxResponse post(RpcfxRequest req, String url) throws IOException { + private RpcfxResponse post(RpcfxRequest req, String url) { String reqJson = JSON.toJSONString(req); System.out.println("req json: "+reqJson); // 1.可以复用client // 2.尝试使用httpclient或者netty client - OkHttpClient client = new OkHttpClient(); - final Request request = new Request.Builder() - .url(url) - .post(RequestBody.create(JSONTYPE, reqJson)) - .build(); - String respJson = client.newCall(request).execute().body().string(); - System.out.println("resp json: "+respJson); - return JSON.parseObject(respJson, RpcfxResponse.class); + String[] hostAndPort = url.split(":"); + if (hostAndPort.length == 2) { + String result = NettyClient.start(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + return JSON.parseObject(result, RpcfxResponse.class); + } + + return null; } } } diff --git a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java index a6f77dac..4a196697 100644 --- a/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java +++ b/07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java @@ -23,7 +23,13 @@ public RpcfxResponse invoke(RpcfxRequest request) { String serviceClass = request.getServiceClass(); // 作业1:改成泛型和反射 - Object service = resolver.resolve(serviceClass);//this.applicationContext.getBean(serviceClass); + Class interfaceClass = null; + try { + interfaceClass = Class.forName(serviceClass); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + Object service = resolver.resolve(serviceClass, interfaceClass);//this.applicationContext.getBean(serviceClass); try { Method method = resolveMethodFromClass(service.getClass(), request.getMethod()); diff --git a/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java b/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java index a4187d14..e82c96b5 100644 --- a/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java +++ b/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java @@ -1,21 +1,22 @@ package io.kimmking.rpcfx.demo.consumer; +import java.util.List; + +import javax.annotation.Resource; + import io.kimmking.rpcfx.api.Filter; import io.kimmking.rpcfx.api.LoadBalancer; import io.kimmking.rpcfx.api.Router; import io.kimmking.rpcfx.api.RpcfxRequest; import io.kimmking.rpcfx.client.Rpcfx; -import io.kimmking.rpcfx.demo.api.Order; -import io.kimmking.rpcfx.demo.api.OrderService; import io.kimmking.rpcfx.demo.api.User; import io.kimmking.rpcfx.demo.api.UserService; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.EnableAspectJAutoProxy; -import java.util.List; -import java.util.Random; - -@SpringBootApplication +@SpringBootApplication(scanBasePackages = "io.kimmking") +@EnableAspectJAutoProxy public class RpcfxClientApplication { // 二方库 @@ -28,7 +29,7 @@ public static void main(String[] args) { // UserService service = new xxx(); // service.findById - UserService userService = Rpcfx.create(UserService.class, "http://localhost:8080/"); + UserService userService = Rpcfx.create(UserService.class, "localhost:8080"); User user = userService.findById(1); System.out.println("find user id=1 from server: " + user.getName()); diff --git a/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/aop/ProxyInterceptor.java b/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/aop/ProxyInterceptor.java new file mode 100644 index 00000000..ec2ecf5d --- /dev/null +++ b/07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/aop/ProxyInterceptor.java @@ -0,0 +1,44 @@ +package io.kimmking.rpcfx.demo.consumer.aop; + +import java.lang.reflect.Method; + +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.core.Ordered; +import org.springframework.stereotype.Component; + +/** + * @author by 平向东 + * @date 2021/10/3 11:26 Copyright 2021 北京交个朋友数码科技有限公司. All rights reserved. + */ +@Component +@Aspect +@Slf4j +public class ProxyInterceptor implements Ordered { + + //@Around("execution(* com.company.controller.*.*(..))") + @Around("execution(* io.kimmking.rpcfx.demo.api.UserService.findById(..))") + public Object around(ProceedingJoinPoint point) throws Throwable { + + log.error("enter around"); + + MethodSignature signature = (MethodSignature)point.getSignature(); + Method method = signature.getMethod(); + + try { + return point.proceed(); + }catch (Exception e){ + e.printStackTrace(); + } + + return null; + } + + @Override + public int getOrder() { + return 1; + } +} diff --git a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java index 65cffbd2..73e28072 100644 --- a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java +++ b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/DemoResolver.java @@ -14,7 +14,7 @@ public void setApplicationContext(ApplicationContext applicationContext) { } @Override - public Object resolve(String serviceClass) { - return this.applicationContext.getBean(serviceClass); + public T resolve(String serviceClass,Class type) { + return this.applicationContext.getBean(serviceClass,type); } } diff --git a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java index d79e422e..0c33629e 100644 --- a/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java +++ b/07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java @@ -1,6 +1,5 @@ package io.kimmking.rpcfx.demo.provider; -import com.alibaba.fastjson.JSON; import io.kimmking.rpcfx.api.RpcfxRequest; import io.kimmking.rpcfx.api.RpcfxResolver; import io.kimmking.rpcfx.api.RpcfxResponse; @@ -8,25 +7,22 @@ import io.kimmking.rpcfx.demo.api.OrderService; import io.kimmking.rpcfx.demo.api.UserService; import io.kimmking.rpcfx.server.RpcfxInvoker; -import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import java.net.InetAddress; -import java.net.InterfaceAddress; -import java.net.UnknownHostException; @SpringBootApplication @RestController +@EnableAspectJAutoProxy public class RpcfxServerApplication { public static void main(String[] args) throws Exception { @@ -94,12 +90,12 @@ public RpcfxResolver createResolver(){ @Bean(name = "io.kimmking.rpcfx.demo.api.UserService") - public UserService createUserService(){ + public UserService userService(){ return new UserServiceImpl(); } @Bean(name = "io.kimmking.rpcfx.demo.api.OrderService") - public OrderService createOrderService(){ + public OrderService orderService(){ return new OrderServiceImpl(); }