diff --git a/02nio/nio01/pom.xml b/02nio/nio01/pom.xml index e42a62f2..3ae38c66 100644 --- a/02nio/nio01/pom.xml +++ b/02nio/nio01/pom.xml @@ -59,6 +59,12 @@ netty-all 4.1.51.Final + + io.netty + netty-all + 4.1.55.Final + compile + diff --git a/02nio/nio01/src/main/java/Main.java b/02nio/nio01/src/main/java/Main.java index b07dca92..35cc141f 100644 --- a/02nio/nio01/src/main/java/Main.java +++ b/02nio/nio01/src/main/java/Main.java @@ -10,7 +10,7 @@ public class Main { public static void main(String[] args) { - Map map = new HashMap<>(); + Map map = new HashMap(); map.put("1", HttpServer01.class); map.put("2", HttpServer02.class); map.put("3", HttpServer03.class); diff --git a/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java index b4f8507d..b6d24957 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java @@ -4,25 +4,25 @@ import java.util.List; public class ForeachDemo { - - private int x=1; - + + private int x = 1; + public static void main(String[] args) { - + ForeachDemo demo = new ForeachDemo(); - + demo.test(); - + System.out.println(demo.x); } - + private void test() { - List list = Arrays.asList(1,2); + List list = Arrays.asList(1, 2); int y = 1; list.forEach(e -> { - x=2; - //y=2; // can't be compiled + x = 2; +// y = 2; // can't be compiled }); } - + } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java index 916819d6..34b8098e 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java @@ -3,19 +3,20 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import java.util.Optional; -public class LambdaDemo { - - public static void main(String args[]){ +public class LambdaDemo { + + public static void main(String args[]) { LambdaDemo demo = new LambdaDemo(); - + MathOperation op = new MathOperation() { @Override public Integer operation(int a, int b) { return 1; } }; - + MathOperation op1 = (a, b) -> 1; @@ -33,55 +34,56 @@ public Integer operation(int a, int b) { MathOperation subtraction = (int a, int b) -> a - b + 1.0; // 大括号中的返回语句 - MathOperation multiplication = (int a, int b) -> { + MathOperation multiplication = (int a, int b) -> { int c = 1000; - return a * b + c; + return a * b + c; }; - + // 没有大括号及返回语句 MathOperation division = (int a, int b) -> a / b; - + System.out.println("10 + 5 = " + demo.operate(10, 5, addition)); System.out.println("10 - 5 = " + demo.operate(10, 5, subtraction)); System.out.println("10 x 5 = " + demo.operate(10, 5, multiplication)); System.out.println("10 / 5 = " + demo.operate(10, 5, division)); - + //System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> new Double(Math.pow(a,b)).intValue())); - - System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> Math.pow(a,b))); - + + System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> Math.pow(a, b))); + // 不用括号 GreetingService greetService1 = message -> System.out.println("Hello " + message); - + // 用括号 + GreetingService greetService2 = (message) -> { System.out.println(message); }; GreetingService greetService3 = System.out::println; - - Arrays.asList(1,2,3).forEach( x -> System.out.println(x+3)); - Arrays.asList(1,2,3).forEach( LambdaDemo::println ); - + + Arrays.asList(1, 2, 3).forEach(x -> System.out.println(x + 3)); + Arrays.asList(1, 2, 3).forEach(LambdaDemo::println); + greetService1.sayMessage("kimmking"); greetService2.sayMessage("Java"); } - + private static void println(int x) { - System.out.println(x+3); + System.out.println(x + 3); } - + interface MathOperation { T operation(int a, int b); // 返回类型+函数名+参数类型的列表 } - + interface GreetingService { void sayMessage(String message); } - - private T operate(int a, int b, MathOperation mathOperation){ + + private T operate(int a, int b, MathOperation mathOperation) { return mathOperation.operation(a, b); } - + } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java index eb4267d8..d9d40afd 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java @@ -3,20 +3,22 @@ import com.alibaba.fastjson.JSON; import java.io.IOException; + import java.util.*; import java.util.stream.Collectors; public class StreamDemo { - + public static void main(String[] args) throws IOException { - - List list = Arrays.asList(4,2,3,5,1,2,2,7,6); + + List list = Arrays.asList(4, 2, 3, 5, 1, 2, 2, 7, 6); print(list); - + // Optional Optional first = list.stream().findFirst(); System.out.println(first.map(i -> i * 100).orElse(100)); + int sum = list.stream().filter( i -> i<4).distinct().reduce(0,(a,b)->a+b); System.out.println("sum="+sum); @@ -24,22 +26,39 @@ public static void main(String[] args) throws IOException { //Map map1 = list.stream().collect(Collectors.toMap(a->a,a->(a+1))); Map map = list.stream().parallel().collect(Collectors.toMap(a->a,a->(a+1),(a,b)->a, LinkedHashMap::new)); print(map); - - + + map.forEach((k, v) -> System.out.println("key:value = " + k + ":" + v)); - List list1 = map.entrySet().parallelStream().map(e -> e.getKey()+e.getValue()).collect(Collectors.toList()); + List list1 = + map.entrySet().parallelStream().map(e -> e.getKey() + e.getValue()).collect(Collectors.toList()); print(list1); - + // parallelStream() - + // 总结: // 1. Fluent API:继续Stream // 2. 终止操作:得到结果 - - + + test(); + test2(); + } + + static int one; + + static void test() { + List b = new ArrayList<>(); + b.add(1); + b.forEach((x) -> one = 2); + System.out.println("test" + one); } - - + + static void test2() { + List b = new ArrayList<>(); + b.add(1); + b.forEach(x -> x = 2); + System.out.println("test" + b.get(0)); + } + private static void print(Object obj) { System.out.println(JSON.toJSONString(obj)); } diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md new file mode 100644 index 00000000..fa7c6aab --- /dev/null +++ b/09mq/activemq-demo/README.md @@ -0,0 +1,42 @@ + +## Week13 周四作业: +### 1. [(必做)搭建 ActiveMQ 服务,基于 JMS,写代码分别实现对于 queue 和 topic 的消息生产和消费,代码提交到 github。](src/main/java/io/byk/activemq/jms) + +使用`JmsMessagingTemplate`模板类来实现 + +### 2. [(选做)基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue/order) + +> - 一个程序往表里写新订单,标记状态为未处理 (status=0); +> - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1; +> - (挑战☆)考虑失败重试策略,考虑多个消费程序如何协作。 + +#### 解决org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题 + +在`application.yml`文件下添加配置信息: + +```sh +mybatis: + mapper-locations: classpath:mapper/*.xml +``` + +#### ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near + +建议不要在创建表的过程中使用mysql保留字,避免后期造成麻烦 + +### 3.[(选做)将上述订单处理场景,改成使用 ActiveMQ 发送消息处理模式。](src/main/java/io/byk/queue/order) + +#### ActiveMQ序列化异常Forbidden class ! This class is not trusted to be serialized as ObjectMessage payload + +在`application.yml`文件下添加配置信息: + +``` +spring: + activemq: + broker-url: tcp://127.0.0.1:61616 # 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml + user: admin + password: admin + pool: + enabled: false + packages: + trust-all: true +``` \ No newline at end of file diff --git a/09mq/activemq-demo/pom.xml b/09mq/activemq-demo/pom.xml index c3352476..329ad1f6 100644 --- a/09mq/activemq-demo/pom.xml +++ b/09mq/activemq-demo/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 2.3.0.RELEASE + 2.2.6.RELEASE io.kimmking.javacourse.mq @@ -24,12 +24,6 @@ spring-boot-starter - - org.apache.activemq - activemq-all - 5.16.0 - - org.projectlombok lombok @@ -40,6 +34,37 @@ spring-boot-starter-test test + + + org.springframework.boot + spring-boot-starter-activemq + + + + org.mybatis + mybatis + 3.5.6 + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.1.4 + + + org.springframework.boot + spring-boot-starter-jdbc + + + + mysql + mysql-connector-java + runtime + + + org.mybatis + mybatis-spring + 2.0.6 + @@ -58,5 +83,4 @@ - diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java new file mode 100644 index 00000000..cd88d03a --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java @@ -0,0 +1,49 @@ +package io.byk.activemq.jms; + + +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; + +import javax.annotation.Resource; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import io.byk.activemq.jms.queue.QueueProducer; +import io.byk.activemq.jms.topic.TopicPublisher; +import lombok.extern.slf4j.Slf4j; + +/** + * 启动类 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@SpringBootApplication +@Slf4j +public class JmsActiveMqApplication implements ApplicationRunner { + @Resource + private QueueProducer queueProducer; + @Resource + private TopicPublisher topicPublisher; + + public static void main(String[] args) { + SpringApplication.run(JmsActiveMqApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + // 测试队列 + for (int i = 0; i < 10; i++) { + String message = "队列消息" + i; + queueProducer.sendMessage(ACTIVE_MQ_QUEUE, message); + } + // 测试主题 + for (int i = 0; i < 10; i++) { + String message = "主题消息" + i; + topicPublisher.sendMessage(ACTIVE_MQ_TOPIC, message); + } + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java new file mode 100644 index 00000000..d1e657cc --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java @@ -0,0 +1,23 @@ +package io.byk.activemq.jms.queue; + +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; + +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueConsumer { + @JmsListener(destination = ACTIVE_MQ_QUEUE) + public void receiveMessage(String message) { + log.info("<=========== 收到消息" + message); + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java new file mode 100644 index 00000000..5e7b0f6f --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java @@ -0,0 +1,33 @@ +package io.byk.activemq.jms.queue; + +import javax.annotation.Resource; +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +@Getter +public class QueueProducer { + // JMS 模板 + @Resource + private JmsMessagingTemplate jmsMessagingTemplate; + + public void sendMessage(String destinationName, String message) { + log.info("============> 发送 Queue 消息" + message); + Destination destination = new ActiveMQQueue(destinationName); + jmsMessagingTemplate.convertAndSend(destination, message); + } + +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java new file mode 100644 index 00000000..2b1a6621 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java @@ -0,0 +1,28 @@ +package io.byk.activemq.jms.topic; + +import javax.jms.ConnectionFactory; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.config.JmsListenerContainerFactory; +import org.springframework.jms.config.SimpleJmsListenerContainerFactory; + +/** + * ContainerFactor 配置 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Configuration +public class JmsContainerConfig { + + @Bean + public JmsListenerContainerFactory myJmsContainerFactory(@Qualifier("jmsConnectionFactory") + ConnectionFactory connectionFactory) { + SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setPubSubDomain(true); + return factory; + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java new file mode 100644 index 00000000..a2cc5437 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java @@ -0,0 +1,31 @@ +package io.byk.activemq.jms.topic; + +import javax.annotation.Resource; +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQTopic; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +/** + * 主题生产者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class TopicPublisher { + // JMS 模板 + @Resource + private JmsMessagingTemplate jmsMessagingTemplate; + + public void sendMessage(String destinationName, String message) { + log.info("============> 发送 Topic 消息" + message); + Destination destination = new ActiveMQTopic(destinationName); + jmsMessagingTemplate.convertAndSend(destination, message); + } + +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java new file mode 100644 index 00000000..08ff083a --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java @@ -0,0 +1,23 @@ +package io.byk.activemq.jms.topic; + +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; + +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +/** + * 主题消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class TopicSubscriber { + @JmsListener(destination = ACTIVE_MQ_TOPIC, containerFactory = "myJmsContainerFactory") + public void receiveMessage(String message) { + log.info("<=========== 收到消息" + message); + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java b/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java new file mode 100644 index 00000000..89db1847 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java @@ -0,0 +1,16 @@ +package io.byk.config; + +/** + * 常量类 + * + * @author boyunkai + * Created on 2021-02-05 + */ +public class ActiveMqConfig { + // 测试队列 + public static final String ACTIVE_MQ_QUEUE = "test.queue"; + // 测试主题 + public static final String ACTIVE_MQ_TOPIC = "test.topic"; + // 队列大小 + public static final Integer QUEUE_SIZE = 10; +} diff --git a/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java b/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java index 75e13283..5042b9be 100644 --- a/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java +++ b/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java @@ -62,9 +62,9 @@ public void onMessage(Message message) { producer.send(message); } - Thread.sleep(2000); - session.close(); - conn.close(); +// Thread.sleep(2000); +// session.close(); +// conn.close(); } catch (Exception e) { diff --git a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java new file mode 100644 index 00000000..078ead67 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java @@ -0,0 +1,55 @@ +package io.order; + +import javax.annotation.Resource; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import io.order.service.ActiveMqQueueConsumer; +import io.order.service.ActiveMqQueueProducer; +import io.order.service.QueueConsumer; +import io.order.service.QueueProducer; +import lombok.extern.slf4j.Slf4j; + +/** + * 启动类 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@SpringBootApplication +@Slf4j +public class QueueApplication implements ApplicationRunner { + @Resource + QueueProducer queueProducer; + + @Resource + QueueConsumer queueConsumer; + + @Resource + ActiveMqQueueProducer activeMqQueueProducer; + + @Resource + ActiveMqQueueConsumer activeMqQueueConsumer; + // 测试队列 + public static final String ACTIVE_MQ_ORDER = "order.activeMQ"; + + public static void main(String[] args) { + SpringApplication.run(QueueApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) { + for (int i = 0; i < 10; i++) { + // log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队"); + activeMqQueueProducer.sendMessage(ACTIVE_MQ_ORDER, String.valueOf(i)); + } + try { + queueConsumer.receiveMessage(); + } catch (IllegalStateException exception) { + log.info(exception.getMessage()); + } + } +} diff --git a/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java b/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java new file mode 100644 index 00000000..3dc42ad4 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java @@ -0,0 +1,27 @@ +package io.order.mapper; +import org.apache.ibatis.annotations.Param; +import java.util.List; + +import io.order.model.Order; +import org.apache.ibatis.annotations.Mapper; + +/** + * @author boyunkai + * Created on 2021-02-07 + */ +@Mapper +public interface OrderMapper { + int deleteByPrimaryKey(Integer id); + + int insert(Order record); + + int insertSelective(Order record); + + Order selectByPrimaryKey(Integer id); + + int updateByPrimaryKeySelective(Order record); + + int updateByPrimaryKey(Order record); + + List selectByStatusId(@Param("statusId")Integer statusId); +} \ No newline at end of file diff --git a/09mq/activemq-demo/src/main/java/io/order/model/Order.java b/09mq/activemq-demo/src/main/java/io/order/model/Order.java new file mode 100644 index 00000000..151bf0e8 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/model/Order.java @@ -0,0 +1,31 @@ +package io.order.model; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * 订单表 + * + * @author boyunkai + * Created on 2021-02-07 + */ +@Data +@AllArgsConstructor +public class Order implements Serializable { + /** + * 主键ID + */ + private Integer id; + + /** + * 订单ID + */ + private String orderId; + + /** + * 状态 0-未处理 1-已处理 + */ + private Integer statusId; +} \ No newline at end of file diff --git a/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java new file mode 100644 index 00000000..f9f182df --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java @@ -0,0 +1,25 @@ +package io.order.service; + +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Service; + +import io.order.model.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class ActiveMqQueueConsumer { + public static final String ACTIVE_MQ_ORDER = "order.activeMQ"; + + @JmsListener(destination = ACTIVE_MQ_ORDER) + public void receiveMessage(Order order) { + order.setStatusId(1); + log.info("<=========== 收到消息" + order.toString()); + } +} diff --git a/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java new file mode 100644 index 00000000..4d18ffd6 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java @@ -0,0 +1,36 @@ +package io.order.service; + +import javax.annotation.Resource; +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import io.order.model.Order; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +@Getter +public class ActiveMqQueueProducer { + // JMS 模板 + @Resource + private JmsMessagingTemplate jmsMessagingTemplate; + + public void sendMessage(String destinationName, String orderId) { + Destination destination = new ActiveMQQueue(destinationName); + // STEP 1: 生成订单 + Order order = new Order(0, orderId, 0); + jmsMessagingTemplate.convertAndSend(destination, order); + log.info("============> 发送 Queue 消息" + order.toString()); + } + +} diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java new file mode 100644 index 00000000..05913e08 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java @@ -0,0 +1,94 @@ +package io.order.service; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +import io.order.mapper.OrderMapper; +import io.order.model.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueConsumer { + @Resource + OrderMapper orderMapper; + private static final ExecutorService executorService = Executors.newFixedThreadPool(3); + //最大重试次数 + private static final Integer TRY_TIMES = 6; + //重试间隔时间单位秒 + private static final Long INTERVAL_TIME = 100L; + + public void receiveMessage() { + // STEP 1: 获取为处理订单列表 + List orderList = orderMapper.selectByStatusId(0); + if (Objects.isNull(orderList) || orderList.isEmpty()) { + throw new IllegalStateException("所有订单已处理"); + } + for (Order order : orderList) { + for (int i = 0; i < 10; i++) { + executorService.submit(() -> { + try { + // 重试策略 + int success = retryBuss(order); + if (success <= 0) { + log.info("订单" + order.getOrderId() + "=======>出队失败"); + } + } catch (InterruptedException exception) { + log.info("进程被打断"); + Thread.currentThread().interrupt(); + } + }); + } + } + } + + private int retryBuss(Order order) throws InterruptedException { + int retryNum = 1; + while (retryNum <= TRY_TIMES) { + try { + // 加锁解决并发,效率降低 + int success = updateOrder(order); + if (success > 0L) { + return 1; + } + retryNum++; + } catch (Exception e) { + retryNum++; + Thread.sleep(INTERVAL_TIME); + continue; + } + } + return 0; + } + + private synchronized int updateOrder(Order order) throws IllegalAccessException, InterruptedException { + // STEP 1: 校验订单 + if (Objects.isNull(order)) { + throw new IllegalAccessException("订单不存在"); + } + if (order.getStatusId() == 1) { + return 1; + } + // STEP 2: 插入订单 + order.setStatusId(1); + int success = orderMapper.updateByPrimaryKeySelective(order); + if (success <= 0L) { + throw new IllegalAccessException("更新订单失败"); + } + Thread.sleep(100); + log.info("订单" + order.getOrderId() + "=======>出队"); + return success; + } +} diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java new file mode 100644 index 00000000..66544647 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java @@ -0,0 +1,33 @@ +package io.order.service; + +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +import io.order.mapper.OrderMapper; +import io.order.model.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueProducer { + @Resource + OrderMapper orderMapper; + + public String sendMessage(String orderId) { + // STEP 1: 生成订单 + Order order = new Order(0, orderId, 0); + // STEP 2: 发送订单消息 + int success = orderMapper.insertSelective(order); + if (success <= 0L) { + throw new IllegalStateException("生成订单失败"); + } + return "订单" + order.getOrderId(); + } +} diff --git a/09mq/activemq-demo/src/main/resources/application.properties b/09mq/activemq-demo/src/main/resources/application.properties deleted file mode 100644 index 8b137891..00000000 --- a/09mq/activemq-demo/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml new file mode 100644 index 00000000..5c84b56a --- /dev/null +++ b/09mq/activemq-demo/src/main/resources/application.yml @@ -0,0 +1,16 @@ +spring: + activemq: + broker-url: tcp://127.0.0.1:61616 # 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml + user: admin + password: admin + pool: + enabled: false + packages: + trust-all: true + datasource: + url: jdbc:mysql://localhost:3306/java_study?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC + username: root + password: + driver-class-name: com.mysql.cj.jdbc.Driver +mybatis: + mapper-locations: classpath:mapper/*.xml diff --git a/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml b/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml new file mode 100644 index 00000000..8b6f267b --- /dev/null +++ b/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml @@ -0,0 +1,80 @@ + + + + + + + + + + + + + id, order_id, status_id + + + + + delete from order_list + where id = #{id,jdbcType=INTEGER} + + + + insert into order_list (order_id, status_id) + values (#{orderId,jdbcType=VARCHAR}, #{statusId,jdbcType=INTEGER}) + + + + insert into order_list + + + order_id, + + + status_id, + + + + + #{orderId,jdbcType=VARCHAR}, + + + #{statusId,jdbcType=INTEGER}, + + + + + + update order_list + + + order_id = #{orderId,jdbcType=VARCHAR}, + + + status_id = #{statusId,jdbcType=INTEGER}, + + + where id = #{id,jdbcType=INTEGER} + + + + update order_list + set order_id = #{orderId,jdbcType=VARCHAR}, + status_id = #{statusId,jdbcType=INTEGER} + where id = #{id,jdbcType=INTEGER} + + + + + \ No newline at end of file diff --git a/09mq/activemq-demo/src/main/resources/order_list.sql b/09mq/activemq-demo/src/main/resources/order_list.sql new file mode 100644 index 00000000..685cf3b3 --- /dev/null +++ b/09mq/activemq-demo/src/main/resources/order_list.sql @@ -0,0 +1,31 @@ +/* + Navicat Premium Data Transfer + + Source Server : root + Source Server Type : MySQL + Source Server Version : 80022 + Source Host : localhost:3306 + Source Schema : java_study + + Target Server Type : MySQL + Target Server Version : 80022 + File Encoding : 65001 + + Date: 08/02/2021 09:45:24 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for order_list +-- ---------------------------- +DROP TABLE IF EXISTS `order_list`; +CREATE TABLE `order_list` ( + `id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID', + `order_id` varchar(100) NOT NULL COMMENT '订单ID', + `status_id` int NOT NULL COMMENT '状态 0-未处理 1-已处理', + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=2951 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表'; + +SET FOREIGN_KEY_CHECKS = 1;