Commit 1d1ad75a by Dave Syer

Allow double-encoded JSON (as sent by Angel clients) in turbine

Fixes gh-873
parent 5bd18e45
......@@ -19,32 +19,42 @@ package org.springframework.cloud.netflix.turbine.stream;
import java.io.IOException;
import java.util.Map;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import rx.subjects.PublishSubject;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.apachecommons.CommonsLog;
import rx.subjects.PublishSubject;
/**
* @author Spencer Gibb
*/
@CommonsLog
@Component //needed for ServiceActivator to be picked up
@Component // needed for ServiceActivator to be picked up
public class HystrixStreamAggregator {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private PublishSubject<Map<String, Object>> subject;
@Autowired
public HystrixStreamAggregator(ObjectMapper objectMapper,
PublishSubject<Map<String, Object>> subject) {
this.objectMapper = objectMapper;
this.subject = subject;
}
@ServiceActivator(inputChannel = TurbineStreamClient.INPUT)
public void sendToSubject(String payload) {
public void sendToSubject(@Payload String payload) {
if (payload.startsWith("\"")) {
// Legacy payload from an Angel client
payload = payload.substring(1, payload.length() - 1);
payload = payload.replace("\\\"", "\"");
}
try {
@SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
......
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.stream;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.beans.factory.annotation.Qualifier;
/**
* Annotation to mark a bean as the Rabbit ConnectionFactory for Spring Cloud Turbine
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface TurbineConnectionFactory {
}
......@@ -16,6 +16,8 @@
package org.springframework.cloud.netflix.turbine.stream;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -27,20 +29,15 @@ import org.springframework.cloud.stream.config.ChannelBindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.subjects.PublishSubject;
/**
* Autoconfiguration for a Spring Cloud Turbine using Spring Cloud Stream. Enabled by
* default if spring-cloud-stream is on the classpath, and can be switched off with
* <code>turbine.stream.enabled</code>.
*
* If there is a single {@link ConnectionFactory} in the context it will be used, or if
* there is a one qualified as <code>@TurbineConnectionFactory</code> it will be preferred
* over others, otherwise the <code>@Primary</code> one will be used. If there are
* multiple unqualified connection factories there will be an autowiring error. Note that
* Spring Boot (as of 1.2.2) creates a ConnectionFactory that is <i>not</i>
* <code>@Primary</code>, so if you want to use one connection factory for turbine and
* another for business messages, you need to create both, and annotate them
* <code>@TurbineConnectionFactory</code> and <code>@Primary</code> respectively.
*
* @author Spencer Gibb
* @author Dave Syer
*/
......@@ -75,8 +72,9 @@ public class TurbineStreamAutoConfiguration {
}
@Bean
public HystrixStreamAggregator hystrixStreamAggregator() {
return new HystrixStreamAggregator();
public HystrixStreamAggregator hystrixStreamAggregator(ObjectMapper mapper,
PublishSubject<Map<String, Object>> publisher) {
return new HystrixStreamAggregator(mapper, publisher);
}
}
......@@ -16,18 +16,11 @@
package org.springframework.cloud.netflix.turbine.stream;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
......@@ -36,15 +29,20 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import rx.Observable;
import rx.subjects.PublishSubject;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigurator;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import lombok.extern.apachecommons.CommonsLog;
import rx.Observable;
import rx.subjects.PublishSubject;
/**
* @author Spencer Gibb
*/
......@@ -62,7 +60,8 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (Stream)", TurbineStreamProperties.class);
return HasFeatures.namedFeature("Turbine (Stream)",
TurbineStreamProperties.class);
}
@Bean
......@@ -75,16 +74,14 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
.aggregateGroupedStreams(
hystrixSubject().groupBy(
data -> InstanceKey.create((String) data
.get("instanceId"))))
.aggregateGroupedStreams(hystrixSubject().groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> Collections.singletonMap("type", (Object) "Ping"))
.publish().refCount();
.map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
......@@ -93,16 +90,15 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(
this.turbinePort,
(request, response) -> {
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(
data -> response.writeAndFlush(new ServerSentEvent(
null, null, JsonUtility.mapToJson(data))));
return output
.doOnUnsubscribe(() -> log
.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null, null, JsonUtility.mapToJson(data))));
}, sseServerConfigurator());
return httpServer;
}
......@@ -130,7 +126,8 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
if (this.running.compareAndSet(true, false)) {
try {
aggregatorServer().shutdown();
} catch (InterruptedException ex) {
}
catch (InterruptedException ex) {
log.error("Error shutting down", ex);
}
}
......
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.stream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import rx.Observable;
import rx.observables.GroupedObservable;
import org.springframework.util.ReflectionUtils;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.aggregator.TypeAndNameKey;
import com.netflix.turbine.internal.JsonUtility;
import static org.springframework.cloud.netflix.turbine.stream.HystrixStreamAggregator.getPayloadData;
public class HystrixStreamAggregatorTest {
public static final String STREAM_ALL = "hystrixtest";
public static void main(String[] args) {
getHystrixStreamFromFile(STREAM_ALL, 1)
.flatMap(commandGroup -> commandGroup.take(50)).take(50).toBlocking()
.forEach(s -> System.out.println("s: " + s));
}
// a hack to simulate a stream
public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> getHystrixStreamFromFile(
final String stream, final int latencyBetweenEvents) {
Observable<Map<String, Object>> objectObservable = Observable.create(sub -> {
try {
while (!sub.isUnsubscribed()) {
String packagePath = HystrixStreamAggregatorTest.class.getPackage().getName()
.replace('.', '/');
InputStream file = HystrixStreamAggregatorTest.class.getResourceAsStream("/"
+ packagePath + "/" + stream + ".stream");
BufferedReader in = new BufferedReader(new InputStreamReader(file));
String line = null;
while ((line = in.readLine()) != null && !sub.isUnsubscribed()) {
if (!line.trim().equals("")) {
if (line.trim().startsWith("{")) {
String json = line.trim();
try {
Map<String, Object> jsonMap = JsonUtility
.jsonToMap(json);
Map<String, Object> data = getPayloadData(jsonMap);
sub.onNext(data);
Thread.sleep(latencyBetweenEvents);
}
catch (Exception ex) {
ReflectionUtils.rethrowException(ex);
}
}
}
}
}
}
catch (Exception ex) {
sub.onError(ex);
}
});
Observable<GroupedObservable<InstanceKey, Map<String, Object>>> observable = objectObservable
.groupBy(data -> InstanceKey.create((String) data.get("instanceId")));
return StreamAggregator.aggregateGroupedStreams(observable);
}
// TODO
/*public static GroupedObservable<InstanceKey, Map<String, Object>> getHystrixStreamFromFileEachLineScheduledEvery10Milliseconds(final String stream, final int instanceID, final TestScheduler scheduler, int maxTime) {
TestSubject<Map<String, Object>> scheduledOrigin = TestSubject.create(scheduler);
try {
String packagePath = FileStream.class.getPackage().getName().replace('.', '/');
InputStream file = FileStream.class.getResourceAsStream("/" + packagePath + "/" + stream + ".stream");
BufferedReader in = new BufferedReader(new InputStreamReader(file));
String line = null;
int time = 0;
while ((line = in.readLine()) != null && time < maxTime) {
if (!line.trim().equals("")) {
if (line.startsWith("data: ")) {
time = time + 10; // increment by 10 milliseconds
String json = line.substring(6);
try {
Map<String, Object> jsonMap = JsonUtility.jsonToMap(json);
// System.err.println(instanceID + " => scheduling at time: " + time + " => " + jsonMap);
scheduledOrigin.onNext(jsonMap, time);
} catch (Exception e) {
System.err.println("bad data");
}
}
}
}
scheduledOrigin.onCompleted(maxTime);
} catch (Exception e) {
throw new RuntimeException(e);
}
return createGroupedObservable(InstanceKey.create(instanceID), scheduledOrigin.subscribeOn(scheduler));
}*/
}
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.stream;
import java.util.Map;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.boot.test.OutputCapture;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import rx.subjects.PublishSubject;
public class HystrixStreamAggregatorTests {
private ObjectMapper mapper = new ObjectMapper();
private PublishSubject<Map<String, Object>> publisher = PublishSubject.create();
private HystrixStreamAggregator aggregator = new HystrixStreamAggregator(this.mapper,
this.publisher);
@Rule
public OutputCapture output = new OutputCapture();
@Test
public void messageDecoded() throws Exception {
this.publisher.subscribe(map -> {
assertThat(map.get("type"), equalTo("HystrixCommand"));
});
this.aggregator.sendToSubject(PAYLOAD);
this.output.expect(not(containsString("ERROR")));
}
@Test
public void doubleEncodedMessage() throws Exception {
this.publisher.subscribe(map -> {
assertThat(map.get("type"), equalTo("HystrixCommand"));
});
// If The JSON is embedded in a JSON String this is what it looks like
String payload = "\"" + PAYLOAD.replace("\"", "\\\"") + "\"";
this.aggregator.sendToSubject(payload);
this.output.expect(not(containsString("ERROR")));
}
private static String PAYLOAD = "{\"origin\":{\"host\":\"dsyer\",\"port\":-1,\"serviceId\":\"application\",\"id\":\"application\"},\"data\":{\"type\":\"HystrixCommand\",\"name\":\"application.ok\",\"group\":\"MyService\",\"currentTime\":1457089387160,\"isCircuitBreakerOpen\":false,\"errorPercentage\":0,\"errorCount\":0,\"requestCount\":0,\"rollingCountCollapsedRequests\":0,\"rollingCountExceptionsThrown\":0,\"rollingCountFailure\":0,\"rollingCountFallbackFailure\":0,\"rollingCountFallbackRejection\":0,\"rollingCountFallbackSuccess\":0,\"rollingCountResponsesFromCache\":0,\"rollingCountSemaphoreRejected\":0,\"rollingCountShortCircuited\":0,\"rollingCountSuccess\":1,\"rollingCountThreadPoolRejected\":0,\"rollingCountTimeout\":0,\"currentConcurrentExecutionCount\":0,\"latencyExecute_mean\":0,\"latencyExecute\":{\"0\":0,\"25\":0,\"50\":0,\"75\":0,\"90\":0,\"95\":0,\"99\":0,\"99.5\":0,\"100\":0},\"latencyTotal_mean\":0,\"latencyTotal\":{\"0\":0,\"25\":0,\"50\":0,\"75\":0,\"90\":0,\"95\":0,\"99\":0,\"99.5\":0,\"100\":0},\"propertyValue_circuitBreakerRequestVolumeThreshold\":20,\"propertyValue_circuitBreakerSleepWindowInMilliseconds\":5000,\"propertyValue_circuitBreakerErrorThresholdPercentage\":50,\"propertyValue_circuitBreakerForceOpen\":false,\"propertyValue_circuitBreakerForceClosed\":false,\"propertyValue_circuitBreakerEnabled\":true,\"propertyValue_executionIsolationStrategy\":\"THREAD\",\"propertyValue_executionIsolationThreadTimeoutInMilliseconds\":1000,\"propertyValue_executionIsolationThreadInterruptOnTimeout\":true,\"propertyValue_executionIsolationThreadPoolKeyOverride\":null,\"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests\":10,\"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests\":10,\"propertyValue_metricsRollingStatisticalWindowInMilliseconds\":10000,\"propertyValue_requestCacheEnabled\":true,\"propertyValue_requestLogEnabled\":true,\"reportingHosts\":1}}";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment