Commit f63603c4 by Johannes Edmeier

upgrade to rxjs 6

parent 87344df6
......@@ -5841,6 +5841,15 @@
"integrity": "sha1-7QMXwyIGT3lGbAKWa922Bas32Zg=",
"dev": true
},
"rxjs": {
"version": "5.5.11",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-5.5.11.tgz",
"integrity": "sha512-3bjO7UwWfA2CV7lmwYMBzj4fQ6Cq+ftHc2MvUe+WMS7wcdJ1LosDWmdjPQanYp2dBRj572p7PeU81JUxHKOcBA==",
"dev": true,
"requires": {
"symbol-observable": "1.0.1"
}
},
"strip-ansi": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-4.0.0.tgz",
......@@ -11091,11 +11100,11 @@
}
},
"rxjs": {
"version": "5.5.11",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-5.5.11.tgz",
"integrity": "sha512-3bjO7UwWfA2CV7lmwYMBzj4fQ6Cq+ftHc2MvUe+WMS7wcdJ1LosDWmdjPQanYp2dBRj572p7PeU81JUxHKOcBA==",
"version": "6.2.1",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-6.2.1.tgz",
"integrity": "sha512-OwMxHxmnmHTUpgO+V7dZChf3Tixf4ih95cmXjzzadULziVl/FKhHScGLj4goEw9weePVOH2Q0+GcCBUhKCZc/g==",
"requires": {
"symbol-observable": "1.0.1"
"tslib": "^1.9.0"
}
},
"safe-buffer": {
......@@ -12009,7 +12018,8 @@
"symbol-observable": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/symbol-observable/-/symbol-observable-1.0.1.tgz",
"integrity": "sha1-g0D8RwLDEi310iKI+IKD9RPT/dQ="
"integrity": "sha1-g0D8RwLDEi310iKI+IKD9RPT/dQ=",
"dev": true
},
"symbol-tree": {
"version": "3.2.2",
......@@ -12278,6 +12288,11 @@
}
}
},
"tslib": {
"version": "1.9.3",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-1.9.3.tgz",
"integrity": "sha512-4krF8scpejhaOgqzBEcGM7yDIEfi0/8+8zDRZhNZZ2kjmHJ4hv3zCbQWxoJGz1iw5U0Jl0nma13xzHXcncMavQ=="
},
"tty-browserify": {
"version": "0.0.0",
"resolved": "https://registry.npmjs.org/tty-browserify/-/tty-browserify-0.0.0.tgz",
......
......@@ -33,7 +33,7 @@
"popper.js": "^1.14.3",
"pretty-bytes": "^5.1.0",
"resize-observer-polyfill": "^1.5.0",
"rxjs": "^5.5.11",
"rxjs": "^6.2.1",
"vue": "^2.5.16",
"vue-clickaway": "^2.2.1",
"vue-router": "^3.0.1",
......
......@@ -16,7 +16,7 @@
import axios from '@/utils/axios';
import waitForPolyfill from '@/utils/eventsource-polyfill';
import {Observable} from '@/utils/rxjs';
import {concat, from, ignoreElements, Observable} from '@/utils/rxjs';
import uri from '@/utils/uri';
import * as _ from 'lodash';
import Instance from './instance';
......@@ -46,7 +46,8 @@ class Application {
}
static getStream() {
return Observable.from(waitForPolyfill()).ignoreElements().concat(
return concat(
from(waitForPolyfill()).pipe(ignoreElements()),
Observable.create(observer => {
const eventSource = new EventSource('applications');
eventSource.onmessage = message => observer.next({
......@@ -58,7 +59,8 @@ class Application {
return () => {
eventSource.close();
};
}));
})
);
}
static _transformResponse(data) {
......
......@@ -17,7 +17,7 @@
import axios from '@/utils/axios';
import waitForPolyfill from '@/utils/eventsource-polyfill';
import logtail from '@/utils/logtail';
import {Observable} from '@/utils/rxjs'
import {concat, from, ignoreElements, Observable} from '@/utils/rxjs';
import uri from '@/utils/uri';
import _ from 'lodash';
......@@ -222,17 +222,20 @@ class Instance {
}
static getEventStream() {
return Observable.from(waitForPolyfill()).ignoreElements().concat(Observable.create(observer => {
const eventSource = new EventSource('instances/events');
eventSource.onmessage = message => observer.next({
...message,
data: JSON.parse(message.data)
});
eventSource.onerror = err => observer.error(err);
return () => {
eventSource.close();
};
}));
return concat(
from(waitForPolyfill()).pipe(ignoreElements()),
Observable.create(observer => {
const eventSource = new EventSource('instances/events');
eventSource.onmessage = message => observer.next({
...message,
data: JSON.parse(message.data)
});
eventSource.onerror = err => observer.error(err);
return () => {
eventSource.close();
};
})
);
}
static async get(id) {
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
import Application from '@/services/application';
import {Observable} from '@/utils/rxjs';
import {concat, concatMap, defer, delay, doFirst, map, retryWhen, tap} from '@/utils/rxjs';
export default class {
constructor() {
......@@ -71,13 +71,19 @@ export default class {
}
start() {
const listing = Observable.defer(() => Application.list()).concatMap(message => message.data);
const stream = Application.getStream().map(message => message.data);
this.subscription = listing.concat(stream)
.doFirst(() => this._dispatchEvent('connected'))
.retryWhen(errors => errors
.do(error => this._dispatchEvent('error', error))
.delay(5000)
const list = defer(() => Application.list())
.pipe(concatMap(message => message.data));
const stream = Application.getStream()
.pipe(map(message => message.data));
this.subscription = concat(list, stream)
.pipe(
doFirst(() => this._dispatchEvent('connected')),
retryWhen(
errors => errors.pipe(
tap(error => this._dispatchEvent('error', error)),
delay(5000)
)
)
).subscribe({
next: application => {
const idx = this.applications.indexOfApplication(application.name);
......
......@@ -14,59 +14,62 @@
* limitations under the License.
*/
import {concatMap, EMPTY, of, timer} from '@/utils/rxjs';
import axios from './axios';
import {Observable} from './rxjs';
export default (url, interval, initialSize = 300 * 1024) => {
let range = `bytes=-${initialSize}`;
let size = 0;
return Observable.timer(0, interval)
.concatMap(() =>
axios.get(url, {
headers: {
range
}
}))
.concatMap(response => {
const initial = size === 0;
const contentLength = response.data.length;
if (response.status === 200) {
if (!initial) {
throw 'Expected 206 - Partial Content on subsequent requests.';
return timer(0, interval)
.pipe(
concatMap(() =>
axios.get(url, {
headers: {
range
}
})
),
concatMap(response => {
const initial = size === 0;
const contentLength = response.data.length;
if (response.status === 200) {
if (!initial) {
throw 'Expected 206 - Partial Content on subsequent requests.';
}
size = contentLength;
} else if (response.status === 206) {
size = parseInt(response.headers['content-range'].split('/')[1]);
} else {
throw 'Unexpected response status: ' + response.status;
}
size = contentLength;
} else if (response.status === 206) {
size = parseInt(response.headers['content-range'].split('/')[1]);
} else {
throw 'Unexpected response status: ' + response.status;
}
// Reload the last byte to avoid a 416: Range unsatisfiable.
// If the response has length = 1 the file hasn't beent changed.
// If the response status is 416 the logfile has been truncated.
range = `bytes=${size - 1}-`;
// Reload the last byte to avoid a 416: Range unsatisfiable.
// If the response has length = 1 the file hasn't beent changed.
// If the response status is 416 the logfile has been truncated.
range = `bytes=${size - 1}-`;
let addendum = null;
let skipped = 0;
let addendum = null;
let skipped = 0;
if (initial) {
if (contentLength >= size) {
addendum = response.data;
} else {
// In case of a partial response find the first line break.
addendum = response.data.substring(response.data.indexOf('\n') + 1);
skipped = size - addendum.length;
if (initial) {
if (contentLength >= size) {
addendum = response.data;
} else {
// In case of a partial response find the first line break.
addendum = response.data.substring(response.data.indexOf('\n') + 1);
skipped = size - addendum.length;
}
} else if (response.data.length > 1) {
// Remove the first byte which has been part of the previos response.
addendum = response.data.substring(1);
}
} else if (response.data.length > 1) {
// Remove the first byte which has been part of the previos response.
addendum = response.data.substring(1);
}
return addendum ? Observable.of({
totalBytes: size,
skipped,
addendum
}) : Observable.empty();
});
return addendum ? of({
totalBytes: size,
skipped,
addendum
}) : EMPTY;
})
);
}
......@@ -14,66 +14,67 @@
* limitations under the License.
*/
import 'rxjs/add/observable/defer';
import 'rxjs/add/observable/empty';
import 'rxjs/add/observable/from';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/concat';
import 'rxjs/add/operator/concatAll';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/do';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/ignoreElements';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/merge';
import 'rxjs/add/operator/retryWhen';
import {defer} from 'rxjs/internal/observable/defer';
import {tap} from 'rxjs/internal/operators/tap';
import {Observable} from 'rxjs/Observable';
import {animationFrame} from 'rxjs/scheduler/animationFrame';
import {Subject} from 'rxjs/Subject';
export {throwError} from 'rxjs/internal/observable/throwError';
export {of} from 'rxjs/internal/observable/of';
export {defer} from 'rxjs/internal/observable/defer';
export {concat} from 'rxjs/internal/observable/concat';
export {EMPTY} from 'rxjs/internal/observable/empty';
export {from} from 'rxjs/internal/observable/from';
export {timer} from 'rxjs/internal/observable/timer';
export {Observable} from 'rxjs/internal/Observable';
export {Subject} from 'rxjs/internal/Subject';
export {animationFrame as animationFrameScheduler} from 'rxjs/internal/scheduler/animationFrame';
Observable.prototype.doOnSubscribe = function (onSubscribe) {
let source = this;
return Observable.defer(() => {
onSubscribe();
return source;
export {concatMap} from 'rxjs/internal/operators/concatMap';
export {delay} from 'rxjs/internal/operators/delay';
export {merge} from 'rxjs/internal/operators/merge';
export {map} from 'rxjs/internal/operators/map';
export {retryWhen} from 'rxjs/internal/operators/retryWhen';
export {tap} from 'rxjs/internal/operators/tap';
export {filter} from 'rxjs/internal/operators/filter';
export {concatAll} from 'rxjs/internal/operators/concatAll';
export {ignoreElements} from 'rxjs/internal/operators/ignoreElements';
export const doOnSubscribe = cb => source =>
defer(() => {
cb();
return source
});
};
Observable.prototype.doFirst = function (doFirst) {
let source = this;
let triggered = false;
return Observable.defer(() => {
export const doFirst = cb => source => {
let triggered;
return defer(() => {
triggered = false;
return source;
}).do(n => {
if (!triggered) {
triggered = true;
doFirst(n);
}
});
}).pipe(
tap( v => {
if (!triggered) {
triggered = true;
cb(v);
}
})
);
};
Observable.prototype.listen = function (callbackFn) {
export const listen = (cb, execDelay = 150) => source => {
let handle = null;
return this.doOnSubscribe(() => handle = setTimeout(() => callbackFn('executing'), 150))
.do({
return source.pipe(
doOnSubscribe(() => handle = setTimeout(() => cb('executing'), execDelay)),
tap({
complete: () => {
handle && clearTimeout(handle);
callbackFn('completed');
cb('completed');
},
error: (error) => {
console.warn('Operation failed:', error);
handle && clearTimeout(handle);
callbackFn('failed');
cb('failed');
}
});
};
export {
Observable,
Subject,
animationFrame
})
)
};
/*
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {concat, EMPTY, of, throwError} from '@/utils/rxjs';
import {delay, doFirst, doOnSubscribe, listen} from './rxjs';
describe('doOnSubscribe', () => {
it('should call callback when subscribing', done => {
const cb = jest.fn();
EMPTY.pipe(
doOnSubscribe(cb)
).subscribe({
complete: () => {
expect(cb).toHaveBeenCalledTimes(1);
done();
}
});
});
});
describe('doFirst', () => {
it('should not call callback when empty', done => {
const cb = jest.fn();
EMPTY.pipe(
doFirst(cb)
).subscribe({
complete: () => {
expect(cb).not.toBeCalled();
done();
}
});
});
it('should call callback on first item', done => {
const cb = jest.fn();
of(1,2,3).pipe(
doFirst(cb)
).subscribe({
complete: () => {
expect(cb).toHaveBeenCalledTimes(1);
done();
}
});
});
});
describe('listen', () => {
it('should call callback with complete', done => {
const cb = jest.fn();
EMPTY.pipe(
listen(cb)
).subscribe({
complete: () => {
expect(cb).toHaveBeenCalledTimes(1);
expect(cb).toHaveBeenCalledWith('completed');
done();
}
});
});
it('should call callback with executing and complete', done => {
const cb = jest.fn();
of(1).pipe(
delay(10),
listen(cb, 1)
).subscribe({
complete: () => {
expect(cb).toHaveBeenCalledTimes(2);
expect(cb).toHaveBeenCalledWith('executing');
expect(cb).toHaveBeenCalledWith('completed');
done();
}
});
});
it('should call callback with failed', done => {
const cb = jest.fn();
throwError(new Error('test')).pipe(
listen(cb)
).subscribe({
error: () => {
expect(cb).toHaveBeenCalledTimes(1);
expect(cb).toHaveBeenCalledWith('failed');
done();
}
});
});
it('should call callback with executing and failed', done => {
const cb = jest.fn();
concat(
of(1).pipe(delay(10)),
throwError(new Error('test'))
).pipe(
listen(cb, 1)
).subscribe({
error: () => {
expect(cb).toHaveBeenCalledTimes(2);
expect(cb).toHaveBeenCalledWith('executing');
expect(cb).toHaveBeenCalledWith('failed');
done();
}
});
});
});
......@@ -94,7 +94,7 @@
import Popper from '@/directives/popper';
import subscribing from '@/mixins/subscribing';
import NotificationFilter from '@/services/notification-filter';
import {Observable, Subject} from '@/utils/rxjs';
import {concatMap, merge, Subject, timer} from '@/utils/rxjs';
import {directive as onClickaway} from 'vue-clickaway';
import NotificationFilterSettings from './notification-filter-settings';
......@@ -176,9 +176,11 @@
createSubscription() {
const vm = this;
vm.notificationFilterSubject = new Subject();
return Observable.timer(0, 60000)
.merge(vm.notificationFilterSubject)
.concatMap(this.fetchNotificationFilters)
return timer(0, 60000)
.pipe(
merge(vm.notificationFilterSubject),
concatMap(this.fetchNotificationFilters),
)
.subscribe({
next: data => {
vm.notificationFilters = data;
......
......@@ -39,7 +39,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import AuditeventsList from '@/views/instances/auditevents/auditevents-list';
import _ from 'lodash';
import moment from 'moment';
......@@ -100,8 +100,10 @@
const vm = this;
vm.lastTimestamp = moment(0);
vm.error = null;
return Observable.timer(0, 5000)
.concatMap(this.fetchAuditevents)
return timer(0, 5000)
.pipe(
concatMap(this.fetchAuditevents)
)
.subscribe({
next: events => {
vm.hasLoaded = true;
......
......@@ -60,7 +60,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import moment from 'moment';
import cacheChart from './cache-chart';
......@@ -117,8 +117,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(vm.fetchMetrics)
return timer(0, 2500)
.pipe(concatMap(vm.fetchMetrics))
.subscribe({
next: data => {
vm.hasLoaded = true;
......
......@@ -24,7 +24,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import _ from 'lodash';
import detailsCache from './details-cache';
......@@ -41,14 +41,14 @@
caches: [],
}),
methods: {
async fetchcaches() {
async fetchCaches() {
const response = await this.instance.fetchMetric('cache.gets');
return _.uniq(response.data.availableTags.filter(tag => tag.tag === 'name')[0].values);
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(this.fetchcaches)
return timer(0, 2500)
.pipe(concatMap(this.fetchCaches))
.subscribe({
next: names => {
vm.caches = names
......
......@@ -55,7 +55,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import moment from 'moment';
import datasourceChart from './datasource-chart';
......@@ -92,8 +92,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(vm.fetchMetrics)
return timer(0, 2500)
.pipe(concatMap(vm.fetchMetrics))
.subscribe({
next: data => {
vm.hasLoaded = true;
......
......@@ -24,7 +24,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import detailsDatasource from './details-datasource';
export default {
......@@ -46,8 +46,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(this.fetchDataSources)
return timer(0, 2500)
.pipe(concatMap(this.fetchDataSources))
.subscribe({
next: names => {
vm.dataSources = names
......
......@@ -53,7 +53,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import moment from 'moment';
export default {
......@@ -86,8 +86,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(this.fetchMetrics)
return timer(0, 2500)
.pipe(concatMap(this.fetchMetrics))
.subscribe({
next: data => {
vm.hasLoaded = true;
......
......@@ -60,7 +60,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import moment from 'moment';
import prettyBytes from 'pretty-bytes';
import memChart from './mem-chart';
......@@ -114,8 +114,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(this.fetchMetrics)
return timer(0, 2500)
.pipe(concatMap(this.fetchMetrics))
.subscribe({
next: data => {
vm.hasLoaded = true;
......
......@@ -67,7 +67,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import processUptime from './process-uptime';
export default {
......@@ -124,8 +124,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(this.fetchCpuLoadMetrics)
return timer(0, 2500)
.pipe(concatMap(this.fetchCpuLoadMetrics))
.subscribe({
next: data => {
vm.processCpuLoad = data.processCpuLoad;
......
......@@ -54,7 +54,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import moment from 'moment';
import threadsChart from './threads-chart';
......@@ -87,8 +87,8 @@
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(this.fetchMetrics)
return timer(0, 2500)
.pipe(concatMap(this.fetchMetrics))
.subscribe({
next: data => {
vm.hasLoaded = true;
......
......@@ -15,7 +15,7 @@
*/
import subscribing from '@/mixins/subscribing';
import {Observable} from '@/utils/rxjs';
import {timer} from '@/utils/rxjs';
import moment from 'moment';
export default {
......@@ -44,11 +44,11 @@ export default {
createSubscription() {
if (this.value) {
const vm = this;
this.startTs = moment.now();
this.offset = 0;
return Observable.timer(0, 1000).subscribe({
vm.startTs = moment.now();
vm.offset = 0;
return timer(0, 1000).subscribe({
next: () => {
vm.offset = moment.now().valueOf() - this.startTs.valueOf();
vm.offset = moment.now().valueOf() - vm.startTs.valueOf();
}
})
}
......
......@@ -91,7 +91,7 @@
<script>
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, filter, from, listen} from '@/utils/rxjs';
import _ from 'lodash';
export default {
......@@ -151,8 +151,8 @@
}, 250),
refreshContext() {
const vm = this;
Observable.from(vm.instance.refreshContext())
.listen(status => vm.refreshStatus = status)
from(vm.instance.refreshContext())
.pipe(listen(status => vm.refreshStatus = status))
.subscribe({
complete: () => {
setTimeout(() => vm.refreshStatus = null, 2500);
......@@ -163,11 +163,14 @@
},
updateEnvironment() {
const vm = this;
Observable.from(vm.managedProperties)
.filter(property => !!property.name && property.input !== property.value)
.listen(status => vm.updateStatus = status)
.concatMap(property => Observable.from(vm.instance.setEnv(property.name, property.input))
.listen(status => property.status = status)
from(vm.managedProperties)
.pipe(
filter(property => !!property.name && property.input !== property.value),
listen(status => vm.updateStatus = status),
concatMap(
property => from(vm.instance.setEnv(property.name, property.input))
.pipe(listen(status => property.status = status))
)
)
.subscribe({
complete: () => {
......@@ -179,8 +182,8 @@
},
resetEnvironment() {
const vm = this;
Observable.from(vm.instance.resetEnv())
.listen(status => vm.resetStatus = status)
from(vm.instance.resetEnv())
.pipe(listen(status => vm.resetStatus = status))
.subscribe({
complete: () => {
vm.managedProperties = [{
......
......@@ -85,7 +85,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import moment from 'moment';
import sbaTracesChart from './traces-chart';
import sbaTracesList from './traces-list';
......@@ -110,6 +110,7 @@
if (contentLength && /^\d+$/.test(contentLength)) {
return parseInt(contentLength);
}
return null;
}
get contentType() {
......@@ -118,6 +119,7 @@
const idx = contentType.indexOf(';');
return idx >= 0 ? contentType.substring(0, idx) : contentType;
}
return null;
}
compareTo(other) {
......@@ -197,8 +199,8 @@
const vm = this;
vm.lastTimestamp = moment(0);
vm.error = null;
return Observable.timer(0, 5000)
.concatMap(vm.fetchHttptrace)
return timer(0, 5000)
.pipe(concatMap(vm.fetchHttptrace))
.subscribe({
next: traces => {
vm.hasLoaded = true;
......
......@@ -45,7 +45,7 @@
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import linkify from '@/utils/linkify';
import {animationFrame, Observable} from '@/utils/rxjs';
import {animationFrameScheduler, concatAll, concatMap, map, of, tap} from '@/utils/rxjs';
import AnsiUp from 'ansi_up';
import _ from 'lodash';
import prettyBytes from 'pretty-bytes';
......@@ -80,10 +80,12 @@
const vm = this;
vm.error = null;
return this.instance.streamLogfile(1000)
.do(chunk => vm.skippedBytes = vm.skippedBytes || chunk.skipped)
.concatMap(chunk => _.chunk(chunk.addendum.split(/\r?\n/), 250))
.map(lines => Observable.of(lines, animationFrame))
.concatAll()
.pipe(
tap(chunk => vm.skippedBytes = vm.skippedBytes || chunk.skipped),
concatMap(chunk => _.chunk(chunk.addendum.split(/\r?\n/), 250)),
map(lines => of(lines, animationFrameScheduler)),
concatAll()
)
.subscribe({
next: lines => {
vm.hasLoaded = true;
......
......@@ -62,7 +62,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, from, timer} from '@/utils/rxjs';
import _ from 'lodash';
import moment from 'moment';
import prettyBytes from 'pretty-bytes';
......@@ -146,12 +146,12 @@
}
},
fetchAllTags() {
return Observable.from(this.tagSelections).concatMap(this.fetchMetric);
return from(this.tagSelections).pipe(concatMap(this.fetchMetric));
},
createSubscription() {
const vm = this;
return Observable.timer(0, 2500)
.concatMap(vm.fetchAllTags)
return timer(0, 2500)
.pipe(concatMap(vm.fetchAllTags))
.subscribe({
next: () => {
}
......
......@@ -76,7 +76,7 @@
<script>
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, from, map, of, tap} from '@/utils/rxjs';
import prettyBytes from 'pretty-bytes';
export default {
......@@ -103,9 +103,11 @@
deleteAllSessions() {
const vm = this;
vm.deletingAll = 'deleting';
this.subscription = Observable.from(vm.sessions)
.map(session => session.id)
.concatMap(vm._deleteSession)
vm.subscription = from(vm.sessions)
.pipe(
map(session => session.id),
concatMap(vm._deleteSession)
)
.subscribe({
complete: () => {
vm.deletingAll = 'deleted';
......@@ -126,18 +128,20 @@
_deleteSession(sessionId) {
const vm = this;
vm.$set(vm.deleting, sessionId, 'deleting');
return Observable.of(sessionId)
.concatMap(async sessionId => {
await vm.instance.deleteSession(sessionId);
return sessionId;
})
.do({
next: sessionId => vm.$set(vm.deleting, sessionId, 'deleted'),
error: error => {
console.warn(`Deleting session ${sessionId} failed:`, error);
vm.$set(vm.deleting, sessionId, 'failed');
}
});
return of(sessionId)
.pipe(
concatMap(async sessionId => {
await vm.instance.deleteSession(sessionId);
return sessionId;
}),
tap({
next: sessionId => vm.$set(vm.deleting, sessionId, 'deleted'),
error: error => {
console.warn(`Deleting session ${sessionId} failed:`, error);
vm.$set(vm.deleting, sessionId, 'failed');
}
})
);
}
}
}
......
......@@ -34,7 +34,7 @@
<script>
import subscribing from '@/mixins/subscribing';
import Instance from '@/services/instance';
import {Observable} from '@/utils/rxjs';
import {concatMap, timer} from '@/utils/rxjs';
import _ from 'lodash';
import moment from 'moment-shortformat';
import threadsList from './threads-list';
......@@ -111,8 +111,8 @@
createSubscription() {
const vm = this;
vm.error = null;
return Observable.timer(0, 1000)
.concatMap(vm.fetchThreaddump)
return timer(0, 1000)
.pipe(concatMap(vm.fetchThreaddump))
.subscribe({
next: threads => {
vm.hasLoaded = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment