From 1abe61e700003ea527c7e4f2ce59dbad2319fe84 Mon Sep 17 00:00:00 2001
From: lepdou <lepdou@126.com>
Date: Mon, 20 Feb 2017 17:54:10 +0800
Subject: [PATCH] send msg to mq when namespace published

---
 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/component/config/PortalConfig.java     |   3 +++
 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/listener/ConfigPublishListener.java    | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------------------------
 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/MQService.java                     |  10 ++++++++++
 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/configuration/MQConfiguration.java |  36 ++++++++++++++++++++++++++++++++++++
 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/ctrip/CtripMQService.java          | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/defaultimpl/DefaultMQService.java  |  14 ++++++++++++++
 6 files changed, 326 insertions(+), 47 deletions(-)
 create mode 100644 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/MQService.java
 create mode 100644 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/configuration/MQConfiguration.java
 create mode 100644 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/ctrip/CtripMQService.java
 create mode 100644 apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/defaultimpl/DefaultMQService.java

diff --git a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/component/config/PortalConfig.java b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/component/config/PortalConfig.java
index d73d2c8..76a6f01 100644
--- a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/component/config/PortalConfig.java
+++ b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/component/config/PortalConfig.java
@@ -215,5 +215,8 @@ public class PortalConfig extends RefreshableConfig {
     return getValue("clogging.server.port");
   }
 
+  public String hermesServerAddress() {
+    return getValue("hermes.server.address");
+  }
 
 }
diff --git a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/listener/ConfigPublishListener.java b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/listener/ConfigPublishListener.java
index e32da3a..ca83d5f 100644
--- a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/listener/ConfigPublishListener.java
+++ b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/listener/ConfigPublishListener.java
@@ -2,6 +2,7 @@ package com.ctrip.framework.apollo.portal.listener;
 
 import com.ctrip.framework.apollo.common.constants.ReleaseOperation;
 import com.ctrip.framework.apollo.core.enums.Env;
+import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
 import com.ctrip.framework.apollo.portal.component.config.PortalConfig;
 import com.ctrip.framework.apollo.portal.component.emailbuilder.GrayPublishEmailBuilder;
 import com.ctrip.framework.apollo.portal.component.emailbuilder.MergeEmailBuilder;
@@ -11,12 +12,18 @@ import com.ctrip.framework.apollo.portal.entity.bo.Email;
 import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
 import com.ctrip.framework.apollo.portal.service.ReleaseHistoryService;
 import com.ctrip.framework.apollo.portal.spi.EmailService;
+import com.ctrip.framework.apollo.portal.spi.MQService;
 import com.ctrip.framework.apollo.tracer.Tracer;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.PostConstruct;
+
 @Component
 public class ConfigPublishListener {
 
@@ -34,75 +41,107 @@ public class ConfigPublishListener {
   private MergeEmailBuilder mergeEmailBuilder;
   @Autowired
   private PortalConfig portalConfig;
+  @Autowired
+  private MQService mqService;
 
+  private ExecutorService executorService;
+
+  @PostConstruct
+  public void init() {
+    executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("ConfigPublishNotify", false));
+  }
 
   @EventListener
   public void onConfigPublish(ConfigPublishEvent event) {
-    Env env = event.getConfigPublishInfo().getEnv();
-    if (!portalConfig.emailSupportedEnvs().contains(env)) {
-      return;
-    }
+    executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
+  }
 
-    ReleaseHistoryBO releaseHistory = getReleaseHistory(event);
 
-    if (releaseHistory == null) {
-      Tracer.logError("Will not send email, because load release history error", null);
-      return;
-    }
+  private class ConfigPublishNotifyTask implements Runnable {
 
-    int realOperation = releaseHistory.getOperation();
+    private ConfigPublishEvent.ConfigPublishInfo publishInfo;
 
-    Email email = null;
-    try {
-      email = buildEmail(env, releaseHistory, realOperation);
-    } catch (Throwable e) {
-      Tracer.logError("build email failed.", e);
+    ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
+      this.publishInfo = publishInfo;
     }
 
-    if (email != null) {
-      emailService.send(email);
+    @Override
+    public void run() {
+      ReleaseHistoryBO releaseHistory = getReleaseHistory();
+      if (releaseHistory == null) {
+        Tracer.logError("Load release history failed", null);
+        return;
+      }
+
+      sendPublishEmail(releaseHistory);
+
+      sendPublishMsg(releaseHistory);
     }
-  }
 
-  private ReleaseHistoryBO getReleaseHistory(ConfigPublishEvent event) {
-    ConfigPublishEvent.ConfigPublishInfo info = event.getConfigPublishInfo();
-    Env env = info.getEnv();
+    private ReleaseHistoryBO getReleaseHistory() {
+      Env env = publishInfo.getEnv();
 
-    int operation = info.isMergeEvent() ? ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER :
-                    info.isRollbackEvent() ? ReleaseOperation.ROLLBACK :
-                    info.isNormalPublishEvent() ? ReleaseOperation.NORMAL_RELEASE :
-                    info.isGrayPublishEvent() ? ReleaseOperation.GRAY_RELEASE : -1;
+      int operation = publishInfo.isMergeEvent() ? ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER :
+                      publishInfo.isRollbackEvent() ? ReleaseOperation.ROLLBACK :
+                      publishInfo.isNormalPublishEvent() ? ReleaseOperation.NORMAL_RELEASE :
+                      publishInfo.isGrayPublishEvent() ? ReleaseOperation.GRAY_RELEASE : -1;
 
-    if (operation == -1) {
-      return null;
-    }
+      if (operation == -1) {
+        return null;
+      }
 
-    if (info.isRollbackEvent()) {
-      return releaseHistoryService
-          .findLatestByPreviousReleaseIdAndOperation(env, info.getPreviousReleaseId(), operation);
-    } else {
-      return releaseHistoryService.findLatestByReleaseIdAndOperation(env, info.getReleaseId(), operation);
-    }
+      if (publishInfo.isRollbackEvent()) {
+        return releaseHistoryService
+            .findLatestByPreviousReleaseIdAndOperation(env, publishInfo.getPreviousReleaseId(), operation);
+      } else {
+        return releaseHistoryService.findLatestByReleaseIdAndOperation(env, publishInfo.getReleaseId(), operation);
+      }
 
-  }
+    }
 
+    private void sendPublishEmail(ReleaseHistoryBO releaseHistory) {
+      Env env = publishInfo.getEnv();
 
-  private Email buildEmail(Env env, ReleaseHistoryBO releaseHistory, int operation) {
-    switch (operation) {
-      case ReleaseOperation.GRAY_RELEASE: {
-        return grayPublishEmailBuilder.build(env, releaseHistory);
+      if (!portalConfig.emailSupportedEnvs().contains(env)) {
+        return;
       }
-      case ReleaseOperation.NORMAL_RELEASE: {
-        return normalPublishEmailBuilder.build(env, releaseHistory);
+
+      int realOperation = releaseHistory.getOperation();
+
+      Email email = null;
+      try {
+        email = buildEmail(env, releaseHistory, realOperation);
+      } catch (Throwable e) {
+        Tracer.logError("build email failed.", e);
       }
-      case ReleaseOperation.ROLLBACK: {
-        return rollbackEmailBuilder.build(env, releaseHistory);
+
+      if (email != null) {
+        emailService.send(email);
       }
-      case ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER: {
-        return mergeEmailBuilder.build(env, releaseHistory);
+    }
+
+    private void sendPublishMsg(ReleaseHistoryBO releaseHistory) {
+      mqService.sendPublishMsg(publishInfo.getEnv(), releaseHistory);
+    }
+
+    private Email buildEmail(Env env, ReleaseHistoryBO releaseHistory, int operation) {
+      switch (operation) {
+        case ReleaseOperation.GRAY_RELEASE: {
+          return grayPublishEmailBuilder.build(env, releaseHistory);
+        }
+        case ReleaseOperation.NORMAL_RELEASE: {
+          return normalPublishEmailBuilder.build(env, releaseHistory);
+        }
+        case ReleaseOperation.ROLLBACK: {
+          return rollbackEmailBuilder.build(env, releaseHistory);
+        }
+        case ReleaseOperation.GRAY_RELEASE_MERGE_TO_MASTER: {
+          return mergeEmailBuilder.build(env, releaseHistory);
+        }
+        default:
+          return null;
       }
-      default:
-        return null;
     }
   }
+
 }
diff --git a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/MQService.java b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/MQService.java
new file mode 100644
index 0000000..bb7ba9b
--- /dev/null
+++ b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/MQService.java
@@ -0,0 +1,10 @@
+package com.ctrip.framework.apollo.portal.spi;
+
+import com.ctrip.framework.apollo.core.enums.Env;
+import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
+
+public interface MQService {
+
+  void sendPublishMsg(Env env, ReleaseHistoryBO releaseHistory);
+
+}
diff --git a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/configuration/MQConfiguration.java b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/configuration/MQConfiguration.java
new file mode 100644
index 0000000..b0cc2a8
--- /dev/null
+++ b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/configuration/MQConfiguration.java
@@ -0,0 +1,36 @@
+package com.ctrip.framework.apollo.portal.spi.configuration;
+
+import com.ctrip.framework.apollo.portal.spi.ctrip.CtripMQService;
+import com.ctrip.framework.apollo.portal.spi.defaultimpl.DefaultMQService;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+
+@Configuration
+public class MQConfiguration {
+
+  @Configuration
+  @Profile("ctrip")
+  public static class CtripMQConfiguration {
+
+    @Bean
+    public CtripMQService mqService() {
+      return new CtripMQService();
+    }
+  }
+
+  /**
+   * spring.profiles.active != ctrip
+   */
+  @Configuration
+  @Profile({"!ctrip"})
+  public static class DefaultMQConfiguration {
+
+    @Bean
+    public DefaultMQService mqService() {
+      return new DefaultMQService();
+    }
+  }
+
+}
diff --git a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/ctrip/CtripMQService.java b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/ctrip/CtripMQService.java
new file mode 100644
index 0000000..63daad6
--- /dev/null
+++ b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/ctrip/CtripMQService.java
@@ -0,0 +1,177 @@
+package com.ctrip.framework.apollo.portal.spi.ctrip;
+
+import com.google.gson.Gson;
+
+import com.ctrip.framework.apollo.common.dto.ReleaseDTO;
+import com.ctrip.framework.apollo.common.entity.App;
+import com.ctrip.framework.apollo.core.enums.Env;
+import com.ctrip.framework.apollo.portal.component.config.PortalConfig;
+import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
+import com.ctrip.framework.apollo.portal.service.AppService;
+import com.ctrip.framework.apollo.portal.service.ReleaseService;
+import com.ctrip.framework.apollo.portal.spi.MQService;
+import com.ctrip.framework.apollo.tracer.Tracer;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.SimpleClientHttpRequestFactory;
+import org.springframework.http.converter.FormHttpMessageConverter;
+import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Arrays;
+
+import javax.annotation.PostConstruct;
+
+
+public class CtripMQService implements MQService {
+
+  private static final org.apache.commons.lang.time.FastDateFormat
+      TIMESTAMP_FORMAT = org.apache.commons.lang.time.FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss");
+  private static final String CONFIG_PUBLISH_NOTIFY_TO_NOC_TOPIC = "ops.noc.record.created";
+
+  private Gson gson = new Gson();
+
+  @Autowired
+  private AppService appService;
+  @Autowired
+  private ReleaseService releaseService;
+  @Autowired
+  private PortalConfig portalConfig;
+
+  private RestTemplate restTemplate;
+
+  @PostConstruct
+  public void init() {
+    restTemplate = new RestTemplate();
+
+    SimpleClientHttpRequestFactory rf = (SimpleClientHttpRequestFactory) restTemplate.getRequestFactory();
+    rf.setReadTimeout(portalConfig.readTimeout());
+    rf.setConnectTimeout(portalConfig.connectTimeout());
+
+    MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
+    converter.setSupportedMediaTypes(
+        Arrays.asList(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM));
+
+    restTemplate.setMessageConverters(Arrays.asList(converter, new FormHttpMessageConverter()));
+
+  }
+
+  @Override
+  public void sendPublishMsg(Env env, ReleaseHistoryBO releaseHistory) {
+    if (releaseHistory == null) {
+      return;
+    }
+
+    PublishMsg msg = buildPublishMsg(env, releaseHistory);
+
+    sendMsg(portalConfig.hermesServerAddress(), CONFIG_PUBLISH_NOTIFY_TO_NOC_TOPIC, msg);
+  }
+
+  private PublishMsg buildPublishMsg(Env env, ReleaseHistoryBO releaseHistory) {
+
+    PublishMsg msg = new PublishMsg();
+
+    msg.setPriority("δΈ­");
+    msg.setTool_origin("Apollo");
+
+    String appId = releaseHistory.getAppId();
+    App app = appService.load(appId);
+    msg.setInfluence_bu(app.getOrgId());
+    msg.setAppid(appId);
+
+    ReleaseDTO release = releaseService.findReleaseById(env, releaseHistory.getReleaseId());
+    msg.setAssginee(release.getDataChangeCreatedBy());
+    msg.setDesc(
+        gson.toJson(releaseService.compare(env, releaseHistory.getPreviousReleaseId(), releaseHistory.getReleaseId())));
+    msg.setOperation_time(TIMESTAMP_FORMAT.format(release.getDataChangeCreatedTime()));
+
+    return msg;
+  }
+
+  private void sendMsg(String serverAddress, String topic, Object msg) {
+    HttpHeaders headers = new HttpHeaders();
+    headers.setContentType(MediaType.parseMediaType(MediaType.APPLICATION_OCTET_STREAM + ";charset=UTF-8"));
+    HttpEntity<Object> request = new HttpEntity<>(msg, headers);
+
+    try {
+      //send msg by hermes RestAPI
+      restTemplate.postForObject(serverAddress + "/topics/" + topic, request, Object.class);
+
+    } catch (Exception e) {
+      Tracer.logError("Send publish msg to hermes failed", e);
+    }
+
+  }
+
+  private class PublishMsg {
+
+    private String assginee;
+    private String desc;
+    private String operation_time;
+    private String tool_origin;
+    private String priority;
+    private String influence_bu;
+    private String appid;
+
+
+    public String getAssginee() {
+      return assginee;
+    }
+
+    public void setAssginee(String assginee) {
+      this.assginee = assginee;
+    }
+
+    public String getDesc() {
+      return desc;
+    }
+
+    public void setDesc(String desc) {
+      this.desc = desc;
+    }
+
+    public String getOperation_time() {
+      return operation_time;
+    }
+
+    public void setOperation_time(String operation_time) {
+      this.operation_time = operation_time;
+    }
+
+    public String getTool_origin() {
+      return tool_origin;
+    }
+
+    public void setTool_origin(String tool_origin) {
+      this.tool_origin = tool_origin;
+    }
+
+    public String getPriority() {
+      return priority;
+    }
+
+    public void setPriority(String priority) {
+      this.priority = priority;
+    }
+
+    public String getInfluence_bu() {
+      return influence_bu;
+    }
+
+    public void setInfluence_bu(String influence_bu) {
+      this.influence_bu = influence_bu;
+    }
+
+    public String getAppid() {
+      return appid;
+    }
+
+    public void setAppid(String appid) {
+      this.appid = appid;
+    }
+  }
+
+}
diff --git a/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/defaultimpl/DefaultMQService.java b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/defaultimpl/DefaultMQService.java
new file mode 100644
index 0000000..b947747
--- /dev/null
+++ b/apollo-portal/src/main/java/com/ctrip/framework/apollo/portal/spi/defaultimpl/DefaultMQService.java
@@ -0,0 +1,14 @@
+package com.ctrip.framework.apollo.portal.spi.defaultimpl;
+
+import com.ctrip.framework.apollo.core.enums.Env;
+import com.ctrip.framework.apollo.portal.entity.bo.ReleaseHistoryBO;
+import com.ctrip.framework.apollo.portal.spi.MQService;
+
+public class DefaultMQService implements MQService{
+
+  @Override
+  public void sendPublishMsg(Env env, ReleaseHistoryBO releaseHistory) {
+    //do nothing
+  }
+
+}
--
libgit2 0.27.1