import { Injectable, NgZone } from '@angular/core';
import { MarketConditionsService, WebSocketClientService, GyakuhibuTop } from '@argentumcode/brisk-common';
import { ApiService } from './api.service';
import { HttpParams } from '@angular/common/http';
import { differenceInSeconds, format } from 'date-fns';
import { Observable, ReplaySubject, Subscription, timer } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

class MarketHeartbeatError extends Error {
  constructor() {
    super('MarketHeartbeatError');
  }
}

@Injectable({
  providedIn: 'root',
})
export class MarketsService {
  // 完全に受信が完了したindex < indexFrom
  private indexFrom = 0;

  // 日付
  private date: Date;

  private series: number;

  public marketError: Error = null;

  private jsfcUpdateSubject = new ReplaySubject<Set<number>>();
  public jsfcUpdate$ = this.jsfcUpdateSubject.asObservable();

  private gyakuhibuTop: Array<GyakuhibuTop> = null;
  private gyakuhibuTopSent = false;

  constructor(
    private api: ApiService,
    private ws: WebSocketClientService,
    private _zone: NgZone,
    private marketCondition: MarketConditionsService
  ) {}

  clear() {
    this.indexFrom = 0;
    this.gyakuhibuTop = null;
    this.gyakuhibuTopSent = false;
  }

  public start(series: number, date: Date, realtime: boolean, backoffReset: Function): Observable<Array<any>> {
    realtime = false;
    if (this.series !== series || format(this.date, 'YYYY-MM-DD') !== format(date, 'YYYY-MM-DD')) {
      this.clear();
      this.series = series;
      this.date = date;
    }
    if (!realtime) {
      return new Observable((sub) => {
        const subscription = this.api.markets(series, format(date, 'YYYY-MM-DD'), this.indexFrom).subscribe(
          (arr) => {
            for (const m of arr['market_conditions']) {
              this.indexFrom = m.index + 1;
            }
            if (arr['gyakuhibu']) {
              if (!this.gyakuhibuTop) {
                this.gyakuhibuTop = this.marketCondition.prepareGyakuhibuTop(arr['gyakuhibu']);
              }
              if (!this.gyakuhibuTopSent && this.gyakuhibuTop) {
                sub.next(this.gyakuhibuTop);
                this.gyakuhibuTopSent = true;
              }
            }
            sub.next(arr['market_conditions']);
            sub.complete();
          },
          (err) => {
            sub.error(err);
          }
        );
        return () => {
          subscription.unsubscribe();
        };
      });
    } else {
      return this.api.marketToken().pipe(
        mergeMap((ret) => {
          return new Observable<Array<any>>((sub) => {
            const params = new HttpParams().set('date', format(date, 'YYYY-MM-DD')).set('series', series.toString());
            const wsUrl = this.api.getWsEndpoint(`/api/market-realtime?${params.append('auth_token', ret.token).toString()}`);
            const ws = this.ws.connect(wsUrl);
            const waitQueue = [];
            let wsLoadFinished = false;
            let apiSubscription: Subscription;
            let marketHeartbeatCheckSubscription: Subscription;
            let marketLastHeartbeat = new Date();
            console.log(`Market Open`);
            ws.subscribe(
              (data) => {
                let jsonArray = JSON.parse(data.data);
                if (!Array.isArray(jsonArray)) {
                  jsonArray = [jsonArray];
                }
                for (const json of jsonArray) {
                  if (json.message_type === 'subscribe_message') {
                    apiSubscription = this.api
                      .markets(series, format(date, 'YYYY-MM-DD'), this.indexFrom, json.from_index)
                      .subscribe((arr) => {
                        for (const m of arr['market_conditions']) {
                          this.indexFrom = m.index + 1;
                        }
                        sub.next(arr['market_conditions']);
                        wsLoadFinished = true;
                        for (const m of waitQueue) {
                          this.indexFrom = m.index + 1;
                        }
                        if (!this.gyakuhibuTop && arr['gyakuhibu']) {
                          this.gyakuhibuTop = this.marketCondition.prepareGyakuhibuTop(arr['gyakuhibu']);
                        }
                        sub.next(waitQueue);
                        if (!this.gyakuhibuTopSent && this.gyakuhibuTop) {
                          sub.next(this.gyakuhibuTop);
                          this.gyakuhibuTopSent = true;
                        }
                        waitQueue.splice(0, waitQueue.length);
                      });
                  } else if (json.message_type === 'heartbeat') {
                    marketLastHeartbeat = new Date();
                    if (backoffReset) {
                      backoffReset();
                    }
                  } else if (json.message_type === 'jsfc') {
                    const g = new Set<number>();
                    for (const issue_code of json.issue_codes) {
                      g.add(issue_code);
                    }
                    this.jsfcUpdateSubject.next(new Set<number>(json.issue_codes));
                  } else if (json.message_type === 'gyakuhibu_top') {
                    if (!this.gyakuhibuTop) {
                      this.gyakuhibuTop = this.marketCondition.prepareGyakuhibuTop(json['gyakuhibu']);
                    }
                    if (wsLoadFinished && !this.gyakuhibuTopSent) {
                      sub.next(this.gyakuhibuTop);
                      this.gyakuhibuTopSent = true;
                    }
                  } else if (json.message_type === 'exit') {
                    sub.complete();
                  } else if (!json.message_type) {
                    if (!wsLoadFinished) {
                      waitQueue.push(json);
                    } else {
                      if (json.index >= this.indexFrom) {
                        this.indexFrom = json.index + 1;
                        sub.next([json]);
                      }
                    }
                  }
                }
              },
              (err) => {
                console.log(`Market Closed`, err);
                sub.error(err);
              },
              () => {
                sub.complete();
              }
            );

            this._zone.runOutsideAngular(() => {
              marketHeartbeatCheckSubscription = timer(10 * 1000, 60 * 1000).subscribe(() => {
                this._zone.runGuarded(() => {
                  if (differenceInSeconds(new Date(), marketLastHeartbeat) > 60) {
                    sub.error(new MarketHeartbeatError());
                  }
                });
              });
            });
            return () => {
              ws.complete();
              ws.unsubscribe();
              if (apiSubscription) {
                apiSubscription.unsubscribe();
              }
              if (marketHeartbeatCheckSubscription) {
                marketHeartbeatCheckSubscription.unsubscribe();
              }
            };
          });
        })
      );
    }
  }
}
