Добавление Socket.io к многопоточному Node.js

dobavlenie socketio k mnogopotochnomu nodejs

Одним из недостатков Node является то, что он однопоточный. Конечно, есть способ обойти это – а именно модуль под названием кластер. Cluster позволяет нам распространять наше приложение на несколько потоков.

Однако теперь возникает новая проблема. Смотрите, наш код, который выполняется в нескольких экземплярах, действительно имеет некоторые существенные недостатки. Одно из них – отсутствие глобальных государств.

Обычно в однопоточном экземпляре это не вызывает особой обеспокоенности. Для нас сейчас это все меняет.

Давайте посмотрим почему.

Итак, в чем проблема?

Наша программа представляет собой простой онлайн-чат, работающий в четырех потоках. Это позволяет пользователю одновременно войти в систему на вашем телефоне и компьютере.

Представьте себе, что у нас есть сокеты, настроенные точно так, как мы бы установили их для одного потока. Другими словами, сейчас у нас есть одно крупное глобальное государство с сокетами.

Когда пользователь входит на компьютер, веб-сайт открывает соединение с экземпляром Socket.io на нашем сервере. Розетка хранится в резьбе №3.

А теперь представьте, что пользователь идет на кухню, чтобы перекусить, и берет с собой телефон, он, естественно, хочет продолжать обмениваться сообщениями со своими друзьями в Интернете.

Их телефон подключается к потоку №4 и сокет сохраняется в состоянии потока.

Отправка сообщения с телефона не принесет пользователю никакой пользы. Сообщения смогут увидеть только люди по теме №3. Это потому, что сокеты, сохраненные в потоке №3, не хранятся как-то волшебным образом в потоках №1, №2 и №4.

Как ни странно, даже сам пользователь не увидит своих сообщений на своем компьютере, вернувшись из кухни.

Конечно, когда они обновляют веб-сайт, мы можем отправить запрос GET и получить последние 50 сообщений, но мы не можем сказать, что это «динамический» способ, не правда ли?

Почему это происходит?

Распространение нашего сервера на несколько потоков определенным образом равносильно наличию нескольких отдельных серверов. Они не знают о существовании друг друга и, конечно, не имеют никаких воспоминаний. Это означает, что объект в одном экземпляре не существует в другом.

Розетки, сохраненные в потоке №3, не обязательно являются всеми розетками, используемыми пользователем на данный момент. Если друзья пользователя находятся в разных цепях, они не увидят пользовательские сообщения, если не обновят веб-сайт.

В идеале мы хотели бы известить другие экземпляры о пользовательском событии. Таким образом, мы можем быть уверены, что каждое подключенное устройство получает обновление в реальном времени.

Решение

Мы можем извещать другие потоки посредством парадигмы Redis публиковать/подписываться (pubsub).

Redis является открытым исходным кодом (BSD-лицензионная) структура данных в памяти магазин. Его можно использовать в качестве базы данных, кэша и брокера сообщений.

Это означает, что мы можем использовать Redis для распределения событий между нашими экземплярами.

Обратите внимание, что обычно мы, вероятно, сохраняем всю нашу структуру внутри Redis. Однако поскольку структура не сериализируется и ее нужно поддерживать «живой» в памяти, мы собираемся хранить ее часть в каждом экземпляре.

Поток

Давайте теперь подумаем о шагах, на которых мы собираемся обрабатывать входящее событие.

  1. Событие звонило сообщение приходит в одну из наших розеток – таким образом, нам не придется прослушивать все возможные события.
  2. Внутри объекта, переданного обработчику этого события в качестве аргумента, мы можем найти имя события. Например, отправить сообщение.on('message', ({ event }) =>{}).
  3. Если для этого имени есть обработчик, мы собираемся его выполнить.
  4. Обработчик может выполняться отправление с ответом.
  5. Отправка посылает событие ответа к нашей публикации Redis. Оттуда это достается излучается к каждому нашему случаю.
  6. Каждый экземпляр передает его в свой socketsState, гарантируя, что каждый подключенный клиент получит событие.

Кажется, это сложно, я знаю, но потерпите.

Реализация

Вот репозиторий с готовой средой, чтобы нам не пришлось устанавливать и настраивать самостоятельно.

Сначала мы собираемся настроить сервер с Экспресс.

import * as moduleAlias from 'module-alias';

moduleAlias.addAliases({
  src: __dirname,
});

import * as express from 'express';
import * as http from 'http';
import * as socketio from 'socket.io';

const port = 7999;

const app = express();
const server = http.createServer(app);
const io = initSocket(socketio(server).of('/socket'));

server.listen(port, () => {
  console.log(`Listening on port ${port}.`);
});

Мы создаем приложение Express, HTTP-сервер и инициальные сокеты.

Теперь мы можем сосредоточиться на добавлении сокетов.

Мы проходим Экземпляр сервера Socket.io для нашей функции, в которой мы устанавливаем промежуточное программное обеспечение.

const initSocket = (instance: socketio.Namespace): socketio.Namespace =>
  instance.use(onAuth).use(onConnection);

onAuth

The onAuth функция просто имитирует фиктивную авторизацию. В нашем случае это на основе токенов.

Лично я вероятно заменил бы его на JWT в будущем, но он никоим образом не выполняется.

const onAuth: SocketMiddleware = (socket, next) => {
  const { token, id }: { token: string; id: string } =
    socket.request._query || socket.request.headers;

  if (!token) {
    return next(new Error('Authorization failed, no token has been provided!'));
  }

  // mock
  const user = checkToken(token, id);

  socket.user = user;

  return next();
};

Теперь перейдем к onConnection промежуточное программное обеспечение

onConnection

const onConnection: SocketMiddleware = (socket, next) => {
  if (!socket.user) {
    return next(new Error('Something went wrong.'));
  }

  const { id } = socket.user;

  socketsState.add(id, socket);

  socket.on('message', ({ event, args }) => {
    const handler = handlers[event];

    if (!handler) {
      return null;
    }

    return handler && handler({ id, args });
  });

  socket.on('disconnect', () => {
    return socketsState.remove(id, socket);
  });

  return next();
};

Здесь мы видим, что мы получаем пользовательские данные idкоторый был установлен в предыдущем промежуточном программном обеспечении, и сохраните его в нашем socketsState, при этом ключ будет идентификатором, а значением – массивом сокетов.

Дальше слушаем сообщение событие. Вся наша логика основана на этом – каждое событие, которое присылает нам интерфейс, будет называться: сообщение.

Имя события будет отправлено внутри объекта arguments – как указано выше.

Обработчики

Как вы можете видеть в onConnection, в частности в прослушивающем событии сообщения, мы ищем обработчик на основе названия события.

Наши обработчики это просто объект, ключом которого является имя происшествия, а значение – функция. Мы будем использовать его для прослушивания событий и ответного реагирования.

const dispatchTypes = {
  MESSAGE_SENT: 'message_sent',
  POST_UPDATED_NOTIFICATION: 'post_updated_notification',
};

interface Handlers {
  [key: string]: ({ id, args }: { id: string; args: any }) => any;
}

const handlers: Handlers = {
  sendMessage: async ({ id, args }) => {
    // await sendMessageToUser();

    dispatch({
      id,
      event: dispatchTypes.MESSAGE_SENT,
      args: {
        message: `A message from user with id: ${id} has been send`,
      },
    });
  },
  postUpdated: async ({ id, args }) => {
    dispatch({
      id,
      event: dispatchTypes.POST_UPDATED_NOTIFICATION,
      args: {
        message: 'A post you have been mentioned in has been updated',
      },
    });
  },
};

export = handlers;

Кроме того, позже мы собираемся добавить отправление функцию и используйте ее для отправки события между экземплярами.

SocketsState

Мы знаем интерфейс нашего государства, но мы его еще не внедрили.

Добавляем способы для добавления и удаления сокета, также для излучения действия.

import * as socketio from 'socket.io';

interface SocketsState {
  [id: string]: socketio.Socket[];
}

const socketsState: SocketsState = {};

const add = (id: string, socket: socketio.Socket) => {
  if (!socketsState[id]) {
    socketsState[id] = [];
  }

  socketsState[id] = [...socketsState[id], socket];

  return socketsState[id];
};

const remove = (id: string, socket: socketio.Socket) => {
  if (!socketsState[id]) {
    return null;
  }

  socketsState[id] = socketsState[id].filter((s) => s !== socket);

  if (!socketsState[id].length) {
    socketsState[id] = undefined;
  }

  return null;
};

const emit = ({
  event,
  id,
  args,
}: {
  event: string;
  id: string;
  args: any;
}) => {
  if (!socketsState[id]) {
    return null;
  }

  socketsState[id].forEach((socket) =>
    socket.emit('message', { event, id, args }),
  );

  return null;
};

export { add, remove, emit };

The добавить функция проверяет, имеет ли состояние свойство, равное идентификатору пользователя. Если это так, то мы просто добавляем его в наш уже существующий массив. В противном случае мы сначала создаем новый массив.

The Удалить функция также проверяет, имеет ли состояние идентификатор пользователя в своих свойствах. Если нет – ничего не делает. В противном случае он фильтрует массив, чтобы удалить сокет из массива. Затем, если массив пуст, он удаляет его из состояния, устанавливая для свойства значение неопределенный.

Паб Redis

За создание наших pubsub мы будем использовать пакет под названием node-redis-pubsub.

import * as NRP from 'node-redis-pubsub';

const client = new NRP({
  port: 6379,
  scope: 'message',
});

export = client;

Добавление отправления

Ладно, теперь все, что осталось сделать, это добавить функцию отправки…

const dispatch = ({
  event,
  id,
  args,
}: {
  event: string;
  id: string;
  args: any;
}) => pubsub.emit('outgoing_socket_message', { event, id, args });

…и добавьте слушателя для outgoing_socket_message. Таким образом, каждый экземпляр получает событие и посылает его в сокеты пользователя.

pubsub.on('outgoing_socket_message', ({ event, id, args }) =>
  socketsState.emit({ event, id, args }),
);

Сделать все это многопоточным

В конце концов, давайте добавим код, нужный для того, чтоб наш сервер был многопоточным.

import * as os from 'os';
import * as cluster from 'cluster';

const spawn = () => {
  const numWorkes = os.cpus().length;

  for (let i = 0; i < numWorkes; i += 1) {
    cluster.fork();
  }

  cluster.on('online', () => {
    console.log('Worker spawned');
  });

  cluster.on('exit', (worker, code, status) => {
    if (code === 0 || worker.exitedAfterDisconnect) {
      console.log(`Worker ${worker.process.pid} finished his job.`);
      return null;
    }

    console.log(
      `Worker ${
        worker.process.pid
      } crashed with code ${code} and status ${status}.`,
    );
    return cluster.fork();
  });
};

export { spawn };
import * as moduleAlias from 'module-alias';

moduleAlias.addAliases({
  src: __dirname,
});

import * as express from 'express';
import * as http from 'http';
import * as cluster from 'cluster';
import * as socketio from 'socket.io';
import * as killPort from 'kill-port';
import { initSocket } from 'src/common/socket';
import { spawn } from 'src/clusters';

const port = 7999;

if (cluster.isMaster) {
  killPort(port).then(spawn);
} else {
  const app = express();
  const server = http.createServer(app);
  const io = initSocket(socketio(server).of('/socket'));

  server.listen(port, () => {
    console.log(`Listening on port ${port}.`);
  });
}

Примечание: мы должны убить порт, потому что после выхода из нашего Nodemon процесс с помощью Ctrl+c он просто зависает.

С небольшими настройками теперь у нас есть рабочие сокеты для всех экземпляров. Как результат: гораздо более эффективный сервер.

Спасибо, что читаете!

Я понимаю, что поначалу все может показаться непосильным и напряженным, чтобы принять все сразу. Имея это в виду, я настоятельно призываю вас еще раз прочесть код полностью и обдумать его в целом.

Если у вас есть вопросы или комментарии, не стесняйтесь оставлять их в разделе комментариев ниже или отправлять мне сообщение.

Просмотрите мои социальные сети!

Присоединяйтесь к моей рассылке!

Первоначально опубликовано на www.mcieslar.com 10 сентября 2018 года.

Добавить комментарий

Ваш адрес email не будет опубликован.