store.js 2.78 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright 2014-2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import Application from '@/services/application';
import {Observable} from '@/utils/rxjs';

export default class {
  constructor() {
    this.applications = [];
22
    this._listeners = {
23 24 25 26 27 28 29
      added: [],
      updated: [],
      removed: []
    };
  }

  indexOf(name) {
Johannes Edmeier committed
30
    return this.applications.findIndex(application => application.name === name);
31 32 33
  }

  addEventListener(type, listener) {
34 35
    if (!(type in this._listeners)) {
      this._listeners[type] = [];
36
    }
37
    this._listeners[type].push(listener);
38 39 40
  }

  removeEventListener(type, listener) {
41
    if (!(type in this._listeners)) {
42 43 44
      return;
    }

45
    const idx = this._listeners[type].indexOf(listener);
46
    if (idx > 0) {
47
      this._listeners[type].splice(idx, 1);
48 49 50 51
    }
  }

  dispatchEvent(type, ...args) {
52
    if (!(type in this._listeners)) {
53 54 55
      return;
    }
    const target = this;
56
    this._listeners[type].forEach(
57 58 59 60 61
      listener => listener.call(target, ...args)
    )
  }

  start() {
62 63 64 65 66 67 68 69
    const listing = Observable.defer(() => Application.list()).concatMap(message => message.data);
    const stream = Application.getStream().map(message => message.data);
    this.subscription = listing.concat(stream)
      .doFirst(() => this.dispatchEvent('connected'))
      .retryWhen(errors => errors
        .do(error => this.dispatchEvent('error', error))
        .delay(5000)
      ).subscribe({
70 71 72
        next: application => {
          const idx = this.indexOf(application.name);
          if (idx >= 0) {
73
            const oldApplication = this.applications[idx];
74 75
            if (application.instances.length > 0) {
              this.applications.splice(idx, 1, application);
76
              this.dispatchEvent('updated', application, oldApplication);
77 78
            } else {
              this.applications.splice(idx, 1);
79
              this.dispatchEvent('removed', oldApplication);
80 81 82 83 84 85 86 87 88 89 90 91
            }
          } else {
            this.applications.push(application);
            this.dispatchEvent('added', application);
          }
        }
      });
  }

  stop() {
    if (this.subscription) {
      try {
92
        !this.subscription.closed && this.subscription.unsubscribe();
93 94 95 96 97 98
      } finally {
        this.subscription = null;
      }
    }
  }
}