Commit 4783b074 by Dave Syer

Support for multi-valued (batched) events in stream aggregator

This allows clients to send batched up events in the same format as before (or to continue to send single events). We can switch the default format to an array in 1.4.x.
parent 0334c5a2
......@@ -17,6 +17,7 @@
package org.springframework.cloud.netflix.turbine.stream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -56,18 +57,33 @@ public class HystrixStreamAggregator {
payload = payload.replace("\\\"", "\"");
}
try {
@SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
Map<String, Object> data = getPayloadData(map);
log.debug("Received hystrix stream payload: " + data);
this.subject.onNext(data);
if (payload.startsWith("[")) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> list = this.objectMapper.readValue(payload,
List.class);
for (Map<String, Object> map : list) {
sendMap(map);
}
}
else {
@SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
sendMap(map);
}
}
catch (IOException ex) {
log.error("Error receiving hystrix stream payload: " + payload, ex);
}
}
private void sendMap(Map<String, Object> map) {
Map<String, Object> data = getPayloadData(map);
if (log.isDebugEnabled()) {
log.debug("Received hystrix stream payload: " + data);
}
this.subject.onNext(data);
}
public static Map<String, Object> getPayloadData(Map<String, Object> jsonMap) {
@SuppressWarnings("unchecked")
Map<String, Object> origin = (Map<String, Object>) jsonMap.get("origin");
......
......@@ -16,18 +16,19 @@
package org.springframework.cloud.netflix.turbine.stream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.boot.test.rule.OutputCapture;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import rx.subjects.PublishSubject;
......@@ -53,6 +54,15 @@ public class HystrixStreamAggregatorTests {
}
@Test
public void messageWrappedInArray() throws Exception {
this.publisher.subscribe(map -> {
assertThat(map.get("type"), equalTo("HystrixCommand"));
});
this.aggregator.sendToSubject("[" + PAYLOAD + "]");
this.output.expect(not(containsString("ERROR")));
}
@Test
public void doubleEncodedMessage() throws Exception {
this.publisher.subscribe(map -> {
assertThat(map.get("type"), equalTo("HystrixCommand"));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment