diff --git a/02nio/nio01/pom.xml b/02nio/nio01/pom.xml
index e42a62f2..3ae38c66 100644
--- a/02nio/nio01/pom.xml
+++ b/02nio/nio01/pom.xml
@@ -59,6 +59,12 @@
netty-all
4.1.51.Final
+
+ io.netty
+ netty-all
+ 4.1.55.Final
+ compile
+
diff --git a/02nio/nio01/src/main/java/Main.java b/02nio/nio01/src/main/java/Main.java
index b07dca92..35cc141f 100644
--- a/02nio/nio01/src/main/java/Main.java
+++ b/02nio/nio01/src/main/java/Main.java
@@ -10,7 +10,7 @@
public class Main {
public static void main(String[] args) {
- Map map = new HashMap<>();
+ Map map = new HashMap();
map.put("1", HttpServer01.class);
map.put("2", HttpServer02.class);
map.put("3", HttpServer03.class);
diff --git a/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java
index b4f8507d..b6d24957 100644
--- a/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java
+++ b/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java
@@ -4,25 +4,25 @@
import java.util.List;
public class ForeachDemo {
-
- private int x=1;
-
+
+ private int x = 1;
+
public static void main(String[] args) {
-
+
ForeachDemo demo = new ForeachDemo();
-
+
demo.test();
-
+
System.out.println(demo.x);
}
-
+
private void test() {
- List list = Arrays.asList(1,2);
+ List list = Arrays.asList(1, 2);
int y = 1;
list.forEach(e -> {
- x=2;
- //y=2; // can't be compiled
+ x = 2;
+// y = 2; // can't be compiled
});
}
-
+
}
diff --git a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java
index 916819d6..34b8098e 100644
--- a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java
+++ b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java
@@ -3,19 +3,20 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Optional;
-public class LambdaDemo {
-
- public static void main(String args[]){
+public class LambdaDemo {
+
+ public static void main(String args[]) {
LambdaDemo demo = new LambdaDemo();
-
+
MathOperation op = new MathOperation() {
@Override
public Integer operation(int a, int b) {
return 1;
}
};
-
+
MathOperation op1 = (a, b) -> 1;
@@ -33,55 +34,56 @@ public Integer operation(int a, int b) {
MathOperation subtraction = (int a, int b) -> a - b + 1.0;
// 大括号中的返回语句
- MathOperation multiplication = (int a, int b) -> {
+ MathOperation multiplication = (int a, int b) -> {
int c = 1000;
- return a * b + c;
+ return a * b + c;
};
-
+
// 没有大括号及返回语句
MathOperation division = (int a, int b) -> a / b;
-
+
System.out.println("10 + 5 = " + demo.operate(10, 5, addition));
System.out.println("10 - 5 = " + demo.operate(10, 5, subtraction));
System.out.println("10 x 5 = " + demo.operate(10, 5, multiplication));
System.out.println("10 / 5 = " + demo.operate(10, 5, division));
-
+
//System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> new Double(Math.pow(a,b)).intValue()));
-
- System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> Math.pow(a,b)));
-
+
+ System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> Math.pow(a, b)));
+
// 不用括号
GreetingService greetService1 = message ->
System.out.println("Hello " + message);
-
+
// 用括号
+
GreetingService greetService2 = (message) -> {
System.out.println(message);
};
GreetingService greetService3 = System.out::println;
-
- Arrays.asList(1,2,3).forEach( x -> System.out.println(x+3));
- Arrays.asList(1,2,3).forEach( LambdaDemo::println );
-
+
+ Arrays.asList(1, 2, 3).forEach(x -> System.out.println(x + 3));
+ Arrays.asList(1, 2, 3).forEach(LambdaDemo::println);
+
greetService1.sayMessage("kimmking");
greetService2.sayMessage("Java");
}
-
+
private static void println(int x) {
- System.out.println(x+3);
+ System.out.println(x + 3);
}
-
+
interface MathOperation {
T operation(int a, int b); // 返回类型+函数名+参数类型的列表
}
-
+
interface GreetingService {
void sayMessage(String message);
}
-
- private T operate(int a, int b, MathOperation mathOperation){
+
+ private T operate(int a, int b, MathOperation mathOperation) {
return mathOperation.operation(a, b);
}
-
+
}
diff --git a/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java
index eb4267d8..d9d40afd 100644
--- a/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java
+++ b/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java
@@ -3,20 +3,22 @@
import com.alibaba.fastjson.JSON;
import java.io.IOException;
+
import java.util.*;
import java.util.stream.Collectors;
public class StreamDemo {
-
+
public static void main(String[] args) throws IOException {
-
- List list = Arrays.asList(4,2,3,5,1,2,2,7,6);
+
+ List list = Arrays.asList(4, 2, 3, 5, 1, 2, 2, 7, 6);
print(list);
-
+
// Optional
Optional first = list.stream().findFirst();
System.out.println(first.map(i -> i * 100).orElse(100));
+
int sum = list.stream().filter( i -> i<4).distinct().reduce(0,(a,b)->a+b);
System.out.println("sum="+sum);
@@ -24,22 +26,39 @@ public static void main(String[] args) throws IOException {
//Map map1 = list.stream().collect(Collectors.toMap(a->a,a->(a+1)));
Map map = list.stream().parallel().collect(Collectors.toMap(a->a,a->(a+1),(a,b)->a, LinkedHashMap::new));
print(map);
-
-
+
+
map.forEach((k, v) -> System.out.println("key:value = " + k + ":" + v));
- List list1 = map.entrySet().parallelStream().map(e -> e.getKey()+e.getValue()).collect(Collectors.toList());
+ List list1 =
+ map.entrySet().parallelStream().map(e -> e.getKey() + e.getValue()).collect(Collectors.toList());
print(list1);
-
+
// parallelStream()
-
+
// 总结:
// 1. Fluent API:继续Stream
// 2. 终止操作:得到结果
-
-
+
+ test();
+ test2();
+ }
+
+ static int one;
+
+ static void test() {
+ List b = new ArrayList<>();
+ b.add(1);
+ b.forEach((x) -> one = 2);
+ System.out.println("test" + one);
}
-
-
+
+ static void test2() {
+ List b = new ArrayList<>();
+ b.add(1);
+ b.forEach(x -> x = 2);
+ System.out.println("test" + b.get(0));
+ }
+
private static void print(Object obj) {
System.out.println(JSON.toJSONString(obj));
}
diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md
new file mode 100644
index 00000000..fa7c6aab
--- /dev/null
+++ b/09mq/activemq-demo/README.md
@@ -0,0 +1,42 @@
+
+## Week13 周四作业:
+### 1. [(必做)搭建 ActiveMQ 服务,基于 JMS,写代码分别实现对于 queue 和 topic 的消息生产和消费,代码提交到 github。](src/main/java/io/byk/activemq/jms)
+
+使用`JmsMessagingTemplate`模板类来实现
+
+### 2. [(选做)基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue/order)
+
+> - 一个程序往表里写新订单,标记状态为未处理 (status=0);
+> - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1;
+> - (挑战☆)考虑失败重试策略,考虑多个消费程序如何协作。
+
+#### 解决org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题
+
+在`application.yml`文件下添加配置信息:
+
+```sh
+mybatis:
+ mapper-locations: classpath:mapper/*.xml
+```
+
+#### ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near
+
+建议不要在创建表的过程中使用mysql保留字,避免后期造成麻烦
+
+### 3.[(选做)将上述订单处理场景,改成使用 ActiveMQ 发送消息处理模式。](src/main/java/io/byk/queue/order)
+
+#### ActiveMQ序列化异常Forbidden class ! This class is not trusted to be serialized as ObjectMessage payload
+
+在`application.yml`文件下添加配置信息:
+
+```
+spring:
+ activemq:
+ broker-url: tcp://127.0.0.1:61616 # 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml
+ user: admin
+ password: admin
+ pool:
+ enabled: false
+ packages:
+ trust-all: true
+```
\ No newline at end of file
diff --git a/09mq/activemq-demo/pom.xml b/09mq/activemq-demo/pom.xml
index c3352476..329ad1f6 100644
--- a/09mq/activemq-demo/pom.xml
+++ b/09mq/activemq-demo/pom.xml
@@ -5,7 +5,7 @@
org.springframework.boot
spring-boot-starter-parent
- 2.3.0.RELEASE
+ 2.2.6.RELEASE
io.kimmking.javacourse.mq
@@ -24,12 +24,6 @@
spring-boot-starter
-
- org.apache.activemq
- activemq-all
- 5.16.0
-
-
org.projectlombok
lombok
@@ -40,6 +34,37 @@
spring-boot-starter-test
test
+
+
+ org.springframework.boot
+ spring-boot-starter-activemq
+
+
+
+ org.mybatis
+ mybatis
+ 3.5.6
+
+
+ org.mybatis.spring.boot
+ mybatis-spring-boot-starter
+ 2.1.4
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+
+
+
+ mysql
+ mysql-connector-java
+ runtime
+
+
+ org.mybatis
+ mybatis-spring
+ 2.0.6
+
@@ -58,5 +83,4 @@
-
diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java
new file mode 100644
index 00000000..cd88d03a
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java
@@ -0,0 +1,49 @@
+package io.byk.activemq.jms;
+
+
+import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_QUEUE;
+import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_TOPIC;
+
+import javax.annotation.Resource;
+
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import io.byk.activemq.jms.queue.QueueProducer;
+import io.byk.activemq.jms.topic.TopicPublisher;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 启动类
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@SpringBootApplication
+@Slf4j
+public class JmsActiveMqApplication implements ApplicationRunner {
+ @Resource
+ private QueueProducer queueProducer;
+ @Resource
+ private TopicPublisher topicPublisher;
+
+ public static void main(String[] args) {
+ SpringApplication.run(JmsActiveMqApplication.class, args);
+ }
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ // 测试队列
+ for (int i = 0; i < 10; i++) {
+ String message = "队列消息" + i;
+ queueProducer.sendMessage(ACTIVE_MQ_QUEUE, message);
+ }
+ // 测试主题
+ for (int i = 0; i < 10; i++) {
+ String message = "主题消息" + i;
+ topicPublisher.sendMessage(ACTIVE_MQ_TOPIC, message);
+ }
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java
new file mode 100644
index 00000000..d1e657cc
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java
@@ -0,0 +1,23 @@
+package io.byk.activemq.jms.queue;
+
+import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_QUEUE;
+
+import org.springframework.jms.annotation.JmsListener;
+import org.springframework.stereotype.Service;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 队列消费者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+public class QueueConsumer {
+ @JmsListener(destination = ACTIVE_MQ_QUEUE)
+ public void receiveMessage(String message) {
+ log.info("<=========== 收到消息" + message);
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java
new file mode 100644
index 00000000..5e7b0f6f
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java
@@ -0,0 +1,33 @@
+package io.byk.activemq.jms.queue;
+
+import javax.annotation.Resource;
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.springframework.jms.core.JmsMessagingTemplate;
+import org.springframework.stereotype.Service;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 队列生产者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+@Getter
+public class QueueProducer {
+ // JMS 模板
+ @Resource
+ private JmsMessagingTemplate jmsMessagingTemplate;
+
+ public void sendMessage(String destinationName, String message) {
+ log.info("============> 发送 Queue 消息" + message);
+ Destination destination = new ActiveMQQueue(destinationName);
+ jmsMessagingTemplate.convertAndSend(destination, message);
+ }
+
+}
diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java
new file mode 100644
index 00000000..2b1a6621
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java
@@ -0,0 +1,28 @@
+package io.byk.activemq.jms.topic;
+
+import javax.jms.ConnectionFactory;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.config.JmsListenerContainerFactory;
+import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
+
+/**
+ * ContainerFactor 配置
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Configuration
+public class JmsContainerConfig {
+
+ @Bean
+ public JmsListenerContainerFactory> myJmsContainerFactory(@Qualifier("jmsConnectionFactory")
+ ConnectionFactory connectionFactory) {
+ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
+ factory.setConnectionFactory(connectionFactory);
+ factory.setPubSubDomain(true);
+ return factory;
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java
new file mode 100644
index 00000000..a2cc5437
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java
@@ -0,0 +1,31 @@
+package io.byk.activemq.jms.topic;
+
+import javax.annotation.Resource;
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.springframework.jms.core.JmsMessagingTemplate;
+import org.springframework.stereotype.Service;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 主题生产者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+public class TopicPublisher {
+ // JMS 模板
+ @Resource
+ private JmsMessagingTemplate jmsMessagingTemplate;
+
+ public void sendMessage(String destinationName, String message) {
+ log.info("============> 发送 Topic 消息" + message);
+ Destination destination = new ActiveMQTopic(destinationName);
+ jmsMessagingTemplate.convertAndSend(destination, message);
+ }
+
+}
diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java
new file mode 100644
index 00000000..08ff083a
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java
@@ -0,0 +1,23 @@
+package io.byk.activemq.jms.topic;
+
+import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_TOPIC;
+
+import org.springframework.jms.annotation.JmsListener;
+import org.springframework.stereotype.Service;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 主题消费者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+public class TopicSubscriber {
+ @JmsListener(destination = ACTIVE_MQ_TOPIC, containerFactory = "myJmsContainerFactory")
+ public void receiveMessage(String message) {
+ log.info("<=========== 收到消息" + message);
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java b/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java
new file mode 100644
index 00000000..89db1847
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java
@@ -0,0 +1,16 @@
+package io.byk.config;
+
+/**
+ * 常量类
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+public class ActiveMqConfig {
+ // 测试队列
+ public static final String ACTIVE_MQ_QUEUE = "test.queue";
+ // 测试主题
+ public static final String ACTIVE_MQ_TOPIC = "test.topic";
+ // 队列大小
+ public static final Integer QUEUE_SIZE = 10;
+}
diff --git a/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java b/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java
index 75e13283..5042b9be 100644
--- a/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java
+++ b/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java
@@ -62,9 +62,9 @@ public void onMessage(Message message) {
producer.send(message);
}
- Thread.sleep(2000);
- session.close();
- conn.close();
+// Thread.sleep(2000);
+// session.close();
+// conn.close();
} catch (Exception e) {
diff --git a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java
new file mode 100644
index 00000000..078ead67
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java
@@ -0,0 +1,55 @@
+package io.order;
+
+import javax.annotation.Resource;
+
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import io.order.service.ActiveMqQueueConsumer;
+import io.order.service.ActiveMqQueueProducer;
+import io.order.service.QueueConsumer;
+import io.order.service.QueueProducer;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 启动类
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@SpringBootApplication
+@Slf4j
+public class QueueApplication implements ApplicationRunner {
+ @Resource
+ QueueProducer queueProducer;
+
+ @Resource
+ QueueConsumer queueConsumer;
+
+ @Resource
+ ActiveMqQueueProducer activeMqQueueProducer;
+
+ @Resource
+ ActiveMqQueueConsumer activeMqQueueConsumer;
+ // 测试队列
+ public static final String ACTIVE_MQ_ORDER = "order.activeMQ";
+
+ public static void main(String[] args) {
+ SpringApplication.run(QueueApplication.class, args);
+ }
+
+ @Override
+ public void run(ApplicationArguments args) {
+ for (int i = 0; i < 10; i++) {
+ // log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队");
+ activeMqQueueProducer.sendMessage(ACTIVE_MQ_ORDER, String.valueOf(i));
+ }
+ try {
+ queueConsumer.receiveMessage();
+ } catch (IllegalStateException exception) {
+ log.info(exception.getMessage());
+ }
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java b/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java
new file mode 100644
index 00000000..3dc42ad4
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java
@@ -0,0 +1,27 @@
+package io.order.mapper;
+import org.apache.ibatis.annotations.Param;
+import java.util.List;
+
+import io.order.model.Order;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author boyunkai
+ * Created on 2021-02-07
+ */
+@Mapper
+public interface OrderMapper {
+ int deleteByPrimaryKey(Integer id);
+
+ int insert(Order record);
+
+ int insertSelective(Order record);
+
+ Order selectByPrimaryKey(Integer id);
+
+ int updateByPrimaryKeySelective(Order record);
+
+ int updateByPrimaryKey(Order record);
+
+ List selectByStatusId(@Param("statusId")Integer statusId);
+}
\ No newline at end of file
diff --git a/09mq/activemq-demo/src/main/java/io/order/model/Order.java b/09mq/activemq-demo/src/main/java/io/order/model/Order.java
new file mode 100644
index 00000000..151bf0e8
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/model/Order.java
@@ -0,0 +1,31 @@
+package io.order.model;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * 订单表
+ *
+ * @author boyunkai
+ * Created on 2021-02-07
+ */
+@Data
+@AllArgsConstructor
+public class Order implements Serializable {
+ /**
+ * 主键ID
+ */
+ private Integer id;
+
+ /**
+ * 订单ID
+ */
+ private String orderId;
+
+ /**
+ * 状态 0-未处理 1-已处理
+ */
+ private Integer statusId;
+}
\ No newline at end of file
diff --git a/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java
new file mode 100644
index 00000000..f9f182df
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java
@@ -0,0 +1,25 @@
+package io.order.service;
+
+import org.springframework.jms.annotation.JmsListener;
+import org.springframework.stereotype.Service;
+
+import io.order.model.Order;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 队列消费者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+public class ActiveMqQueueConsumer {
+ public static final String ACTIVE_MQ_ORDER = "order.activeMQ";
+
+ @JmsListener(destination = ACTIVE_MQ_ORDER)
+ public void receiveMessage(Order order) {
+ order.setStatusId(1);
+ log.info("<=========== 收到消息" + order.toString());
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java
new file mode 100644
index 00000000..4d18ffd6
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java
@@ -0,0 +1,36 @@
+package io.order.service;
+
+import javax.annotation.Resource;
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.springframework.jms.core.JmsMessagingTemplate;
+import org.springframework.stereotype.Service;
+
+import io.order.model.Order;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 队列生产者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+@Getter
+public class ActiveMqQueueProducer {
+ // JMS 模板
+ @Resource
+ private JmsMessagingTemplate jmsMessagingTemplate;
+
+ public void sendMessage(String destinationName, String orderId) {
+ Destination destination = new ActiveMQQueue(destinationName);
+ // STEP 1: 生成订单
+ Order order = new Order(0, orderId, 0);
+ jmsMessagingTemplate.convertAndSend(destination, order);
+ log.info("============> 发送 Queue 消息" + order.toString());
+ }
+
+}
diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java
new file mode 100644
index 00000000..05913e08
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java
@@ -0,0 +1,94 @@
+package io.order.service;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Resource;
+
+import org.springframework.stereotype.Service;
+
+import io.order.mapper.OrderMapper;
+import io.order.model.Order;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 队列消费者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+public class QueueConsumer {
+ @Resource
+ OrderMapper orderMapper;
+ private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
+ //最大重试次数
+ private static final Integer TRY_TIMES = 6;
+ //重试间隔时间单位秒
+ private static final Long INTERVAL_TIME = 100L;
+
+ public void receiveMessage() {
+ // STEP 1: 获取为处理订单列表
+ List orderList = orderMapper.selectByStatusId(0);
+ if (Objects.isNull(orderList) || orderList.isEmpty()) {
+ throw new IllegalStateException("所有订单已处理");
+ }
+ for (Order order : orderList) {
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ // 重试策略
+ int success = retryBuss(order);
+ if (success <= 0) {
+ log.info("订单" + order.getOrderId() + "=======>出队失败");
+ }
+ } catch (InterruptedException exception) {
+ log.info("进程被打断");
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+ }
+ }
+
+ private int retryBuss(Order order) throws InterruptedException {
+ int retryNum = 1;
+ while (retryNum <= TRY_TIMES) {
+ try {
+ // 加锁解决并发,效率降低
+ int success = updateOrder(order);
+ if (success > 0L) {
+ return 1;
+ }
+ retryNum++;
+ } catch (Exception e) {
+ retryNum++;
+ Thread.sleep(INTERVAL_TIME);
+ continue;
+ }
+ }
+ return 0;
+ }
+
+ private synchronized int updateOrder(Order order) throws IllegalAccessException, InterruptedException {
+ // STEP 1: 校验订单
+ if (Objects.isNull(order)) {
+ throw new IllegalAccessException("订单不存在");
+ }
+ if (order.getStatusId() == 1) {
+ return 1;
+ }
+ // STEP 2: 插入订单
+ order.setStatusId(1);
+ int success = orderMapper.updateByPrimaryKeySelective(order);
+ if (success <= 0L) {
+ throw new IllegalAccessException("更新订单失败");
+ }
+ Thread.sleep(100);
+ log.info("订单" + order.getOrderId() + "=======>出队");
+ return success;
+ }
+}
diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java
new file mode 100644
index 00000000..66544647
--- /dev/null
+++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java
@@ -0,0 +1,33 @@
+package io.order.service;
+
+import javax.annotation.Resource;
+
+import org.springframework.stereotype.Service;
+
+import io.order.mapper.OrderMapper;
+import io.order.model.Order;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 队列生产者
+ *
+ * @author boyunkai
+ * Created on 2021-02-05
+ */
+@Service
+@Slf4j
+public class QueueProducer {
+ @Resource
+ OrderMapper orderMapper;
+
+ public String sendMessage(String orderId) {
+ // STEP 1: 生成订单
+ Order order = new Order(0, orderId, 0);
+ // STEP 2: 发送订单消息
+ int success = orderMapper.insertSelective(order);
+ if (success <= 0L) {
+ throw new IllegalStateException("生成订单失败");
+ }
+ return "订单" + order.getOrderId();
+ }
+}
diff --git a/09mq/activemq-demo/src/main/resources/application.properties b/09mq/activemq-demo/src/main/resources/application.properties
deleted file mode 100644
index 8b137891..00000000
--- a/09mq/activemq-demo/src/main/resources/application.properties
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml
new file mode 100644
index 00000000..5c84b56a
--- /dev/null
+++ b/09mq/activemq-demo/src/main/resources/application.yml
@@ -0,0 +1,16 @@
+spring:
+ activemq:
+ broker-url: tcp://127.0.0.1:61616 # 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml
+ user: admin
+ password: admin
+ pool:
+ enabled: false
+ packages:
+ trust-all: true
+ datasource:
+ url: jdbc:mysql://localhost:3306/java_study?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
+ username: root
+ password:
+ driver-class-name: com.mysql.cj.jdbc.Driver
+mybatis:
+ mapper-locations: classpath:mapper/*.xml
diff --git a/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml b/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml
new file mode 100644
index 00000000..8b6f267b
--- /dev/null
+++ b/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml
@@ -0,0 +1,80 @@
+
+
+
+
+
+
+
+
+
+
+
+
+ id, order_id, status_id
+
+
+
+
+ delete from order_list
+ where id = #{id,jdbcType=INTEGER}
+
+
+
+ insert into order_list (order_id, status_id)
+ values (#{orderId,jdbcType=VARCHAR}, #{statusId,jdbcType=INTEGER})
+
+
+
+ insert into order_list
+
+
+ order_id,
+
+
+ status_id,
+
+
+
+
+ #{orderId,jdbcType=VARCHAR},
+
+
+ #{statusId,jdbcType=INTEGER},
+
+
+
+
+
+ update order_list
+
+
+ order_id = #{orderId,jdbcType=VARCHAR},
+
+
+ status_id = #{statusId,jdbcType=INTEGER},
+
+
+ where id = #{id,jdbcType=INTEGER}
+
+
+
+ update order_list
+ set order_id = #{orderId,jdbcType=VARCHAR},
+ status_id = #{statusId,jdbcType=INTEGER}
+ where id = #{id,jdbcType=INTEGER}
+
+
+
+
+
\ No newline at end of file
diff --git a/09mq/activemq-demo/src/main/resources/order_list.sql b/09mq/activemq-demo/src/main/resources/order_list.sql
new file mode 100644
index 00000000..685cf3b3
--- /dev/null
+++ b/09mq/activemq-demo/src/main/resources/order_list.sql
@@ -0,0 +1,31 @@
+/*
+ Navicat Premium Data Transfer
+
+ Source Server : root
+ Source Server Type : MySQL
+ Source Server Version : 80022
+ Source Host : localhost:3306
+ Source Schema : java_study
+
+ Target Server Type : MySQL
+ Target Server Version : 80022
+ File Encoding : 65001
+
+ Date: 08/02/2021 09:45:24
+*/
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+-- ----------------------------
+-- Table structure for order_list
+-- ----------------------------
+DROP TABLE IF EXISTS `order_list`;
+CREATE TABLE `order_list` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
+ `order_id` varchar(100) NOT NULL COMMENT '订单ID',
+ `status_id` int NOT NULL COMMENT '状态 0-未处理 1-已处理',
+ PRIMARY KEY (`id`)
+) ENGINE=InnoDB AUTO_INCREMENT=2951 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表';
+
+SET FOREIGN_KEY_CHECKS = 1;