民治制作网站顺义广州网站建设
2026/2/12 14:35:41 网站建设 项目流程
民治制作网站,顺义广州网站建设,wordpress加帝国cms,wordpress isux主题整个工具的代码都在Gitee或者Github地址内 gitee#xff1a;solomon-parent: 这个项目主要是总结了工作上遇到的问题以及学习一些框架用于整合例如:rabbitMq、reids、Mqtt、S3协议的文件服务器、mongodb github#xff1a;GitHub - ZeroNing/solomon-parent: 这个项目主要是…整个工具的代码都在Gitee或者Github地址内giteesolomon-parent: 这个项目主要是总结了工作上遇到的问题以及学习一些框架用于整合例如:rabbitMq、reids、Mqtt、S3协议的文件服务器、mongodbgithubGitHub - ZeroNing/solomon-parent: 这个项目主要是总结了工作上遇到的问题以及学习一些框架用于整合例如:rabbitMq、reids、Mqtt、S3协议的文件服务器、mongodb需要引入的JAR包dependencies dependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-mqtt/artifactId /dependency /dependencies需要引入的JAR包(版本根据自身要求使用本教程用的版本均为最新)1.新增MessageListener注解Target(value { ElementType.FIELD, ElementType.TYPE }) Retention(RetentionPolicy.RUNTIME) Component Conditional(MqttCondition.class) public interface MessageListener { /** * 主题 */ String[] topics(); /** * 消息质量 */ int qos() default 0; /** * 允许订阅的租户范围 */ String[] tenantRange() default StrUtil.EMPTY; }订阅的租户范围为空,则是所有租户可以订阅消费2.异常编码public interface MqttErrorCode extends BaseExceptionCode { String CLIENT_IS_NULLCLIENT_IS_NULL; }public interface BaseExceptionCode { /** * 切换数据源失败 */ String FAILED_TO_SWITCH_DATA_SOURCE FAILED_TO_SWITCH_DATA_SOURCE; /** * 请求方式错误 */ String REQUEST_METHOD_ERROR S9991; /** * 非法请求 */ String BAD_REQUEST S9992; /** * 服务调用失败 */ String SERVICE_CALL_ERRORS9993; /** * 系统限流 */ String SYSTEM_LIMITING S9994; /** * 系统熔断 */ String SYSTEM_FUSING S10000; /** * 参数错误 */ String PARAMETER_ERROR_CODE S9995; /** * 参数异常 */ String PARAMETER_EXCEPTION_CODE S9996; /** * 无权访问 */ String NO_ACCESS_EXCEPTION_CODE S9997; /** * 对不起请勿重复请求 */ String ACCESS_EXCEPTION_CODE S9998; /** * 系统异常请联系客服人员后稍后在试 */ String BASE_EXCEPTION_CODE S9999; /** * 文件不存在稍后在试 */ String FILE_IS_NOT_EXIST_EXCEPTION_CODE F9999; /** * 对不起登录失败账号密码错误,稍后再试 */ String LOGIN_EXCEPTION_CODE L0000; /** * 登陆已过期请重新登录 */ String LOGIN_TOKEN_EXCEPTION_CODE L0001; /** * id不能为空 */ String ID_NOT_NULLID_NOT_NULL; /** * mongo配置文件为空 */ String MONGODB_PROPERTIES_IS_NULLMONGODB_PROPERTIES_IS_NULL; /** * 文件类型不在允许范围内 */ String FILE_TYPE_NOT_WITHIN_THE_ALLOWABLE_RANGE FILE_TYPE_NOT_WITHIN_THE_ALLOWABLE_RANGE; /** * 上传文件大小超过最大限制 */ String FILE_UPLOAD_MAX_SIZE FILE_UPLOAD_MAX_SIZE; String FILE_HIGH_RISK FILE_HIGH_RISK; }3.Mqtt开关控制public class MqttCondition implements Condition { Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { String enabled ValidateUtils.getOrDefault(context.getEnvironment().getProperty(mqtt.enabled),true); return BooleanUtil.toBoolean(enabled); } }4.Mqtt配置文件4.1 Mqtt单个配置文件public class MqttProfile { /** * 用户名 */ private String userName; /** * 密码 */ private String password; /** * 连接 */ private String url; /** * 客户端的标识(不可重复,为空时侯用uuid) */ private String clientId; /** * 连接超时 */ private int completionTimeout 30; /** * 是否自动重连 */ private boolean automaticReconnect true; /** * 客户端掉线后,是否自动清除session */ private boolean cleanSession false; /** * 心跳时间 */ private int keepAliveInterval 60; /** * 遗嘱消息 */ private MqttWill will; /** * 最大未确认消息数量 */ private int maxInflight 10; /** * 重新连接之间等待的最长时间 */ private int maxReconnectDelay 12800; /** * 设置连接超时值,该值以秒为单位 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败. */ private int connectionTimeout MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT; /** * 设置执行器服务应等待的时间以秒为单位在强制终止之前终止。不建议更改除非您绝对确定需要否则该值。 */ private int executorServiceTimeout 1; /** * ssl连接是否验证证书 */ private boolean verifyCertificate false; public static class MqttWill implements Serializable { /** * 遗嘱主题 */ private String topic; /** * 遗嘱消息 */ private String message; /** * 遗嘱消息质量 */ private int qos; /** * 是否保留消息 */ private boolean retained; public boolean getRetained() { return retained; } public void setRetained(boolean retained) { this.retained retained; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic topic; } public String getMessage() { return message; } public void setMessage(String message) { this.message message; } public int getQos() { return qos; } public void setQos(int qos) { this.qos qos; } } public MqttWill getWill() { return will; } public void setWill(MqttWill will) { this.will will; } public int getKeepAliveInterval() { return keepAliveInterval; } public void setKeepAliveInterval(int keepAliveInterval) { this.keepAliveInterval keepAliveInterval; } public boolean getCleanSession() { return cleanSession; } public void setCleanSession(boolean cleanSession) { this.cleanSession cleanSession; } public boolean getAutomaticReconnect() { return automaticReconnect; } public void setAutomaticReconnect(boolean automaticReconnect) { this.automaticReconnect automaticReconnect; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password password; } public String getUrl() { return url; } public void setUrl(String url) { this.url url; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId clientId; } public int getCompletionTimeout() { return completionTimeout; } public void setCompletionTimeout(int completionTimeout) { this.completionTimeout completionTimeout; } public int getMaxInflight() { return maxInflight; } public void setMaxInflight(int maxInflight) { this.maxInflight maxInflight; } public int getMaxReconnectDelay() { return maxReconnectDelay; } public void setMaxReconnectDelay(int maxReconnectDelay) { this.maxReconnectDelay maxReconnectDelay; } public int getConnectionTimeout() { return connectionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout connectionTimeout; } public int getExecutorServiceTimeout() { return executorServiceTimeout; } public void setExecutorServiceTimeout(int executorServiceTimeout) { this.executorServiceTimeout executorServiceTimeout; } public boolean getVerifyCertificate() { return verifyCertificate; } public void setVerifyCertificate(boolean verifyCertificate) { this.verifyCertificate verifyCertificate; } }4.2 多租户Mqtt配置文件ConfigurationProperties(mqtt) public class TenantMqttProfile { public MapString,MqttProfile tenant; //是否启用 private boolean enabled true; public boolean getEnabled() { return enabled; } public void setEnabled(boolean enabled) { this.enabled enabled; } public MapString, MqttProfile getTenant() { return tenant; } public void setTenant(MapString, MqttProfile tenant) { this.tenant tenant; } }5.Mqtt实体类public class BaseMqT implements Serializable { /** * 消费者数据 */ private T body; /** * 租户 */ private String tenantCode; /** * 消息ID */ private String msgId; public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId msgId; } public BaseMq(){ } public BaseMq(T body){ this.body body; } public T getBody() { return body; } public void setBody(T body) { this.body body; } public String getTenantCode() { return tenantCode; } public void setTenantCode(String tenantCode) { this.tenantCode tenantCode; } }public class MqttModelT extends BaseMqT { private String topic; private boolean retained; private int qos; public int getQos() { return qos; } public void setQos(int qos) { this.qos qos; } public boolean getRetained() { return retained; } public void setRetained(boolean retained) { this.retained retained; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic topic; } public MqttModel(String tenantCode) { super(); setTenantCode(tenantCode); } public MqttModel(String tenantCode,String topic,T body) { super(body); this.topic topic; setTenantCode(tenantCode); } }6.Mqtt初始化接口public interface MqttInitService { void initMqttClient(String tenantCode, MqttProfile mqttProfile, ListObject clazzList) throws Exception; void initMqttClient(String tenantCode, MqttProfile mqttProfile) throws Exception; }public class DefaultMqttInitService implements MqttInitService { private final Logger logger LoggerUtils.logger(DefaultMqttInitService.class); private final MqttUtils utils; public DefaultMqttInitService(MqttUtils utils) { this.utils utils; } Override public void initMqttClient(String tenantCode, MqttProfile mqttProfile, ListObject clazzList) throws Exception { String url mqttProfile.getUrl().split(,)[0]; MqttClient mqttClient new MqttClient(url, ValidateUtils.getOrDefault(mqttProfile.getClientId(), UUID.randomUUID().toString())); MqttConnectOptions options utils.initMqttConnectOptions(mqttProfile); mqttClient.connect(options); utils.putOptionsMap(tenantCode,options); // 订阅主题 utils.subscribe(mqttClient,clazzList,tenantCode); //配置callback mqttClient.setCallback(new MqttCallbackExtended() { Override public void connectComplete(boolean reconnect, String serverURI) { logger.info(租户:{} 重连{},tenantCode,reconnect ? 成功 : 失败); if(reconnect){ for (Object abstractConsumer : clazzList) { MessageListener messageListener AnnotationUtil.getAnnotation(abstractConsumer.getClass(), MessageListener.class); if(ValidateUtils.isEmpty(messageListener) || ValidateUtils.isEmpty(messageListener.topics())){ continue; } try { for(String topic : messageListener.topics()){ topic SpringUtil.getElValue(topic); logger.info(租户:{} 重新订阅[{}]主题,tenantCode,topic); mqttClient.subscribe(topic, messageListener.qos(), (IMqttMessageListener) BeanUtil.copyProperties(abstractConsumer,abstractConsumer.getClass(), (String) null)); } } catch (MqttException e) { logger.error(重连重新订阅主题失败,异常为:,e); } } } } Override public void connectionLost(Throwable cause) { logger.info(租户:{} 断开连接,异常为:,tenantCode,cause); } Override public void messageArrived(String topic, MqttMessage message) throws Exception { } Override public void deliveryComplete(IMqttDeliveryToken token) { } }); //保存client utils.putClient(tenantCode,mqttClient); } Override public void initMqttClient(String tenantCode, MqttProfile mqttProfile) throws Exception { this.initMqttClient(tenantCode,mqttProfile,new ArrayList(SpringUtil.getBeansWithAnnotation(MessageListener.class).values())); } }7.Mqtt工具类public interface SendServiceT extends BaseMq { /** * 发送消息 */ void send(T data) throws Exception; /** * 发送延缓信息 */ void sendDelay(T data, long delay) throws Exception; /** * 发送消息,并设置消息过期时间 */ void sendExpiration(T data, long expiration) throws Exception; }Configuration public class MqttUtils implements SendServiceMqttModel? { private final Logger logger LoggerUtils.logger(MqttUtils.class); private final MapString,MqttClient clientMap new HashMap(); private final MapString,MqttConnectOptions optionsMap new HashMap(); public MapString, MqttConnectOptions getOptionsMap() { return optionsMap; } public void putOptionsMap(String tenantCode, MqttConnectOptions options) { this.optionsMap.put(tenantCode,options); } public MapString, MqttClient getClientMap() { return clientMap; } public void putClient(String tenantCode,MqttClient client) { this.clientMap.put(tenantCode, client); } /** * 发送消息 * param data 消息内容 */ Override public void send(MqttModel? data) throws Exception { // 获取客户端实例 try { // 转换消息为json字符串 String json JSONUtil.toJsonStr(data); getClient(data.getTenantCode()).getTopic(data.getTopic()).publish(json.getBytes(StandardCharsets.UTF_8), data.getQos(), data.getRetained()); } catch (MqttException e) { logger.error(String.format(MQTT: 主题[%s]发送消息失败, data.getTopic())); } } Override public void sendDelay(MqttModel? data, long delay) throws Exception { send(data); } Override public void sendExpiration(MqttModel? data, long expiration) throws Exception { send(data); } /** * 订阅消息 * param tenantCode 租户编码 * param topic 主题 * param qos 消息质量 * param consumer 消费者 */ public void subscribe(String tenantCode,String topic,int qos, IMqttMessageListener consumer) throws MqttException, BaseException { if(ValidateUtils.isEmpty(topic)){ return; } getClient(tenantCode).subscribe(topic, qos,consumer); } /** * 订阅消息 * param client mqtt连接 */ public void subscribe(MqttClient client,String tenantCode) throws MqttException { ListObject clazzList new ArrayList(SpringUtil.getBeansWithAnnotation(MessageListener.class).values()); this.subscribe(client,clazzList,tenantCode); } /** * 订阅消息 * param client mqtt连接 */ public void subscribe(MqttClient client,ListObject clazzList,String tenantCode) throws MqttException { if (ValidateUtils.isNotEmpty(clazzList)) { for (Object abstractConsumer : clazzList) { MessageListener messageListener AnnotationUtil.getAnnotation(abstractConsumer.getClass(), MessageListener.class); if (ValidateUtils.isEmpty(messageListener) || ValidateUtils.isEmpty(messageListener.topics())) { continue; } ListString rangeList Lambda.toList(Arrays.asList(messageListener.tenantRange()), ValidateUtils::isNotEmpty, key-key); if(ValidateUtils.isEmpty(rangeList) || rangeList.contains(tenantCode)){ for (String topic : messageListener.topics()) { topic SpringUtil.getElValue(topic); AbstractConsumer?,? consumer (AbstractConsumer?,?) BeanUtil.copyProperties(abstractConsumer,abstractConsumer.getClass(), (String) null); client.subscribe(topic, messageListener.qos(), consumer); } } else { logger.info({}租户,{}只支持{}范围,tenantCode,abstractConsumer.getClass().getSimpleName(),rangeList.toArray()); } } } } /** * 取消订阅 * param topic 主题 */ public void unsubscribe(String tenantCode,String[] topic) throws MqttException, BaseException { if(ValidateUtils.isEmpty(topic)){ return; } getClient(tenantCode).unsubscribe(topic); } /** * 关闭连接 */ public void disconnect(String tenantCode) throws MqttException, BaseException { getClient(tenantCode).disconnect(); } /** * 重新连接 */ public void reconnect(String tenantCode) throws MqttException, BaseException { MqttClient client getClient(tenantCode); if(!client.isConnected()){ client.connect(getOptionsMap().get(tenantCode)); subscribe(client,tenantCode); } } public void reconnect(String tenantCode,MqttProfile mqttProfile) throws MqttException, BaseException { MqttClient client getClient(tenantCode); if(!client.isConnected()){ client.connect(initMqttConnectOptions(mqttProfile)); subscribe(client,tenantCode); } } private MqttClient getClient(String tenantCode) throws BaseException { MqttClient client getClientMap().get(tenantCode); if(ValidateUtils.isEmpty(client)){ throw new BaseException(MqttErrorCode.CLIENT_IS_NULL,tenantCode); } return client; } public MqttConnectOptions initMqttConnectOptions(MqttProfile mqttProfile) { MqttConnectOptions mqttConnectOptions new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProfile.getUserName()); mqttConnectOptions.setPassword(mqttProfile.getPassword().toCharArray()); mqttConnectOptions.setServerURIs(mqttProfile.getUrl().split(,)); //设置同一时间可以发送的最大未确认消息数量 mqttConnectOptions.setMaxInflight(mqttProfile.getMaxInflight()); //设置超时时间 mqttConnectOptions.setConnectionTimeout(mqttProfile.getCompletionTimeout()); //设置自动重连 mqttConnectOptions.setAutomaticReconnect(mqttProfile.getAutomaticReconnect()); //cleanSession 设为 true;当客户端掉线时;服务器端会清除 客户端session;重连后 客户端会有一个新的session,cleanSession // 设为false客户端掉线后 服务器端不会清除session当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息 mqttConnectOptions.setCleanSession(mqttProfile.getCleanSession()); // 设置会话心跳时间 单位为秒 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(mqttProfile.getKeepAliveInterval()); // 设置重新连接之间等待的最长时间 mqttConnectOptions.setMaxReconnectDelay(mqttProfile.getMaxReconnectDelay()); // 设置连接超时值,该值以秒为单位 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败. mqttConnectOptions.setConnectionTimeout(mqttProfile.getConnectionTimeout()); // 设置执行器服务应等待的时间以秒为单位在强制终止之前终止.不建议更改除非您绝对确定需要否则该值. mqttConnectOptions.setExecutorServiceTimeout(mqttProfile.getExecutorServiceTimeout()); //设置遗嘱消息 MqttWill will mqttProfile.getWill(); if (ValidateUtils.isNotEmpty(will)) { mqttConnectOptions.setWill(will.getTopic(), will.getMessage().getBytes(), will.getQos(), will.getRetained()); } if(!mqttProfile.getVerifyCertificate()){ try { // 创建信任所有证书的 SSLContext SSLContext sslContext SSLContext.getInstance(TLS); sslContext.init(null, new TrustManager[]{new X509TrustManager() { public X509Certificate[] getAcceptedIssuers() { return null; } public void checkClientTrusted(X509Certificate[] certs, String authType) {} public void checkServerTrusted(X509Certificate[] certs, String authType) {} }}, new java.security.SecureRandom()); mqttConnectOptions.setSocketFactory(sslContext.getSocketFactory()); // 可选设置主机名验证为忽略 (Paho 1.2.0支持) // options.setSSLHostnameVerifier((hostname, session) - true); } catch (NoSuchAlgorithmException | KeyManagementException e) { throw new RuntimeException(Error setting up SSL for MQTT, e); } } return mqttConnectOptions; } }8.通用Mqtt消息处理器public interface CommonMqttMessageListenerT,R,M extends BaseMqT { /** * 消费方法 */ R handleMessage(T body) throws Exception; /** * 保存消费成功/失败的消息 */ void saveLog(R result, Throwable throwable, M model); /** * 判断是否重复消费 * return true 重复消费 false 不重复消费 */ default boolean checkMessageKey(M model){ return false; } /** * 删除判断重复消费Key */ default void deleteCheckMessageKey(M model){} /** * 转换消息 */ default M conversion(String json){ Type parameterizedType getParameterizedType(M); M model JSONUtil.toBean(json, parameterizedType,true); T body model.getBody(); if(ValidateUtils.isNotEmpty(body)){ boolean isJsonObject body instanceof JSONObject; boolean isJsonArray body instanceof JSONArray; if(!isJsonObject !isJsonArray){ return model; } Type typeArgument TypeUtil.getTypeArgument(getClass(),0); body JSONUtil.toBean(JSONUtil.toJsonStr(body),typeArgument,true); } else { parameterizedType getParameterizedType(T); body JSONUtil.toBean(json,parameterizedType,true); } model.setBody(body); return model; } default Type getParameterizedType(String typeName){ MapType, Type typeMap TypeUtil.getTypeMap(getClass()); Type parameterizedType null; for(Map.EntryType,Type entry: typeMap.entrySet()){ if(StrUtil.equalsAnyIgnoreCase(typeName,entry.getKey().getTypeName())){ parameterizedType entry.getValue(); break; } } return parameterizedType; } }public abstract class AbstractConsumerT,R implements IMqttMessageListener, CommonMqttMessageListenerT,R,MqttModelT { protected final Logger logger LoggerUtils.logger(getClass()); protected String topic; protected MqttMessage mqttMessage; protected String tenantCode; Override public void messageArrived(String topic, MqttMessage message) throws Exception { this.topic topic; this.mqttMessage message; String json new String(message.getPayload(), StandardCharsets.UTF_8); Throwable throwable null; R result null; MqttModelT model null; try { model conversion(json); tenantCode model.getTenantCode(); logger.info(线程名:{},租户编码为:{},消息ID:{},topic主题:{},AbstractConsumer:消费者消息: {},Thread.currentThread().getName(),tenantCode,message.getId(),topic, json); // 判断是否重复消费 if(checkMessageKey(model)){ throw new BaseException(MqErrorCode.MESSAGE_REPEAT_CONSUMPTION); } if(ValidateUtils.isNotEmpty(tenantCode)){ RequestHeaderHolder.setTenantCode(tenantCode); } // 消费消息 result this.handleMessage(model.getBody()); } catch (Throwable e){ logger.error(AbstractConsumer:消费报错,消息为:{}, 异常为:,json, e); throwable e; } finally { deleteCheckMessageKey(model); // 保存消费成功/失败消息 saveLog(result,throwable,model); } } }9.测试代码RestController public class TestSendMqController { private final MqttUtils utils; public TestSendMqController(MqttUtils utils) { this.utils utils; } GetMapping(/test) public Object test() throws Exception { utils.send(new MqttModelString(test,top/test/123,123)); return null; } }MessageListener(topics top//123,tenantRange test12) public class TestHandler extends AbstractConsumerString,String { Override public String handleMessage(String body) throws Exception { logger.info(接受的主题是:{},内容是:{},topic,body); return ; } Override public void saveLog(String result, Throwable throwable, MqttModelString model) { } }i18n: all-locale: zh,en language: zh path: i18n/messages server: port: 8001 doc: title: mqtt测试用例 enabled: true globalRequestParameters: - name: token in: HEADER description: 用户认证令牌 required: true deprecated: false hidden: false spring: mvc: pathmatch: matching-strategy: ant_path_matcher jmx: enabled: true servlet: multipart: max-file-size: 20MB max-request-size: 20MB mqtt: tenant: test: user-name: admin password: admin url: tcp://127.0.0.1:1883

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询