import { eventChannel, channel } from 'redux-saga';
import { take, all, put, call, fork, cancel, cancelled, join, apply, select, actionChannel, takeLatest, takeEvery } from 'redux-saga/effects';
import ReconnectingWebSocket from 'reconnecting-websocket';
import { Map, fromJS } from 'immutable';
import { isNil } from 'lodash';
import { dispatch } from 'src/store';

import * as autobahnActions from './action';
import * as authenticationActions from '../authentication/action';
import * as authSelector from '../authentication/selector';

import * as authService from 'src/service/auth';

const channels = {};
let socket = null;

export function * autobahnRoot () {
  yield fork(function * () {
    while (true) {
      yield take([
        authenticationActions.AUTHENTICATED,
        authenticationActions.REHYDRATE_SUCCESS,
        authenticationActions.CHANGE_CLIENT_SUCCESS
      ]);

      yield put(autobahnActions.start());
    }
  });
  yield fork(authenticate);
  yield fork(watchRegisterChannel);
  yield fork(reregisterChannels);
  yield fork(function * () {
    while (yield take(autobahnActions.START)) {
      const task = yield fork(main);

      yield take(autobahnActions.STOP);
      try {
        if (socket) {
          yield apply(socket, socket.close, null);
        }
      } catch (err) {
        console.error('Unable to close the websocket', err); // eslint-disable-line no-console
      }

      yield cancel(task);
    }
  });

  yield takeEvery(authenticationActions.REFRESH_AUTH_TOKEN_SUCCESS, resubscribe);
  yield takeLatest(authenticationActions.CHANGE_CLIENT, closeSocket);
  yield takeLatest(authenticationActions.CLEAR, closeSocket);
}

export function createSocket () {
  socket = new ReconnectingWebSocket(process.env.REACT_APP_AUTOBAHN_ADDR, [], {});
  return socket;
}

export function * closeSocket () {
  try {
    yield put(autobahnActions.stop());
  } catch (err) {
    console.error('Unable to stop the Autobahn', err); // eslint-disable-line no-console
  }
}

export function createReaderChannel (socket) {
  return eventChannel((emit) => {
    const messageHandler = (message) => {
      try {
        emit(message.data);
      } catch (err) {
        console.error(err); // eslint-disable-line no-console
      }
    };

    socket.addEventListener('message', messageHandler);

    return () => {
      socket.removeEventListener('message', messageHandler);
    };
  });
}

export function * authenticate () {
  while (true) {
    yield take([
      autobahnActions.AUTHENTICATE
    ]);

    const jwt = yield authService.getAccessToken();
    const clientId = yield select(authSelector.getClientId);

    yield put(autobahnActions.publish(
      'dashboard/subscribe',
      fromJS({
        jwt,
        clientId,
        platform: 'dashboard'
      })
    ));
  }
}

export function * resubscribe () {
  try {
    const jwt = yield authService.getAccessToken();

    const clientId = yield select(authSelector.getClientId);

    yield put(autobahnActions.publish(
      'dashboard/subscribe',
      fromJS({
        jwt,
        clientId,
        platform: 'dashboard'
      })
    ));
  } catch (err) {
    console.error('Unable to resubscribe to the Autobahn', err); // eslint-disable-line no-console
  }
}

export function * main () {
  yield call(createSocket);
  socket.addEventListener('open', async () => {
    // dispatch the authenticate action to send the subscribe message on any open/reconnect events
    await dispatch(autobahnActions.authenticate(), null);
  });


  const readerChannel = yield call(createReaderChannel, socket);
  const writerChannel = yield actionChannel(autobahnActions.PUBLISH);

  const channelWorkers = yield all([
    fork(function * () {
      while (true) {
        const data: any = yield take(readerChannel);

        try {
          const autobahnEvent: any = fromJS(JSON.parse(data));

          // filter by client
          const currentClientId = yield select(authSelector.getClientId);
          const eventClientIds = autobahnEvent.getIn(['payload', 'clientIds']);
          if (!isNil(eventClientIds) && !eventClientIds.includes(currentClientId)) {
            // skip processing if autobahnEvent not for current client
            return;
          }

          const autobahnEventNamePath = autobahnEvent.get('type').split('/');
          const autobahnEventType = autobahnEventNamePath.shift();
          const payload = {
            autobahnEvent,
            currentClientId,
            autobahnEventNamePath
          };

          let channelName;
          switch (autobahnEventType) {
            case 'data':
              channelName = autobahnEventNamePath.shift();
              break;

            default:
              channelName = 'system';
              break;
          }

          const moduleChannel = channels[channelName];
          if (moduleChannel) {
            yield put(moduleChannel, payload);
          }

          if (channelName !== 'system') {
            yield put(channels['system'], {
              autobahnEvent: autobahnEvent.set('type', 'system/frog-message-success'),
              autobahnEventNamePath,
              payload
            });
          }
        } catch (err) {
          console.error(`failed to parse payload. Error: ${err}`, { err }); // eslint-disable-line no-console
        }
      }
    }),

    fork(function * () {
      while (true) {
        const action = yield take(writerChannel);

        try {
          const event = JSON.stringify({
            message: { type: action.name || null },
            ...(action.payload || Map()).toJS()
          });

          yield apply(socket, socket.send, [event]);
        } catch (err) {
          console.error(`failed to send payload. Error: ${err}`, { err }); // eslint-disable-line no-console
        }
      }
    })
  ]);

  try {
    yield join(channelWorkers);
  } catch (err) {
    console.error('main error', err); // eslint-disable-line no-console
  } finally {
    if (yield cancelled()) {
      readerChannel.close();

      yield cancel(channelWorkers);
    }
  }
}

function * watchRegisterChannel () {
  while (true) {
    const { name, channelConsumer } = yield take(autobahnActions.REGISTER_CHANNEL);
    if (!channels[name]) {
      channels[name] = yield call(channel);
      yield fork(channelConsumer, channels[name]);
    } else {
      console.warn(`${name} channel has already been registered, not resistering a new one`); // eslint-disable-line no-console
    }
  }
}

function * reregisterChannels () {
  while (true) {
    yield take(authenticationActions.CHANGE_CLIENT);
    for (const name in channels) {
      // flush channel to discard any unprocessed messages
      yield channels[name].flush(() => null);
    }
  }
}
