Статья также доступна на украинском (перейти к просмотру).
Оглавление
- Что такое брокер сообщений
- Особенности Apache Kafka
- Создание условий и программной среды для установки сервера Apache Kafka
- Загрузка и размещение программы на сервере Ubuntu
- Конфигурирование сервера Apache Kafka
- Запуск сервера Apache Kafka
- Тестирование сервера Apache Kafka
- Повышение уровня безопасности при работе с кластером Kafka
Использование брокеров сообщений позволяет обеспечить быструю связь между компонентами распределенных систем, упростить масштабирование. Существует несколько типов брокеров, нацеленных на выполнение определенных задач. В частности, для систем обмена сообщениями для публикации или подписки критическим показателем является скорость обработки данных, требующая наличия брокера с соответствующими характеристиками. Apache Kafka наиболее полно отвечает указанному и некоторым другим критериям, что сделало его довольно популярным средством в указанном сегменте. Рассмотрим технические возможности и продемонстрируем работу с ним на практике.
Что такое брокер сообщений
Это программный модуль, выполняющий посреднические функции в коммуникациях между компонентами распределенных систем. Основными элементами любого брокера является издатель и их получатель или подписчик. Брокер определяет протокол взаимодействия между ними, то есть устанавливает правила, регулирующие очередность и время формирования сообщений, время их хранения на сервере и некоторые другие.
Брокеры позволяют организовать асинхронный режим обмена данными и могут быть полезны в следующих случаях:
- Для реализации отложенного выполнения действий;
- Сглаживание пиковых нагрузок;
- Отправка большого количества сообщений;
- Реализации сложной бизнес-логики распределенных систем;
- Обеспечение персистентности данных;
- Организации выполнения задач по расписанию.
Наиболее популярными брокерами, кроме Apache Kafka, являются RabbitMQ и Redis, каждый из которых ориентирован на определенное направление использования.
Особенности Apache Kafka
Программное средство является одним из самых быстрых среди существующих брокеров, скорость обработки сообщений составляет около 1 млн. в секунду. Кроме того, можно выделить следующие его особенности:
- Автоматическое информирование клиентов (подписчиков) о наличии уведомлений;
- Генерирование «неограниченного» количества сообщений;
- Обеспечение персистентности данных – возможность обновить или прочесть сообщение позже;
- Гарантирование четкой последовательности получения сообщений подписчиками в пределах топиков.
Указанные особенности программы позволяют ей занимать первые позиции при использовании в системах публикации или подписки, о чем уже говорилось выше.
Создание условий и программной среды для установки сервера Apache Kafka
Основными требованиями для успешной установки и работы сервера Apache Kafka есть следующие:
- Наличие Linux-сервера под управлением ОС Ubuntu;
- Минимум 4 Gb оперативной памяти (ОП) компьютера;
- Учетная запись «не root» пользователя с правами sudo;
- OpenJDK версии 11 или выше.
Нами будет использован сервер под управлением ОС Ubuntu 22.04, обычно используемый для работы VPS-серверів. Он имеет 4 Gb ОП и потому первые два требования нами выполнены. Теперь сконцентрируемся на исполнении последних двух требований.
Создание пользователя с правами sudo
Создадим пользователя с именем testing_kaf. Для этого наберем в терминале следующую команду:
$ sudo adduser testing_kaf
Пользователь успешно создан. Добавим его в группу пользователей с правами sudo:
$ sudo adduser testing_kaf sudo
Результат – Adding user testing_kaf to group sudo Done (Пользователь testing_kaf добавлен в группу sudo).
Переключимся на созданную учетную запись, не выходя из системы:
$ su -l testing_kaf
Следовательно, пользователь testing_kaf с правами sudo является активным. Одно условие выполнено.
Развертывание OpenJDK
Программа Apache Kafka создана на языке Java и поэтому для полноценного ее функционирования необходимо наличие виртуальной машины JVM. Начнем процесс ее создания в нашей среде.
Для начала обновим индекс программных пакетов Ubuntu:
$ sudo apt update
Проверим имеющиеся в репозитории версии Java:
$ java -version
ОВыберем версию 2:1.11-72build2 и начнем процесс инсталляции. Набираем в терминале команду для установки среды выполнения JRE по умолчанию. Она установит JRE із OpenJDK 11:
$ sudo apt install default-jre
Соглашаемся с выделением дополнительных 366 Mb дискового пространства, введя кнопку Y. После этого инициируется процесс загрузки пакетов.
Проверим корректность инсталляции JRE с помощью следующей команды:
$ java -version
Итак, JRE установлено. Теперь установим Java Development Kit (JDK), необходимое для компиляции и запуска определенных типов приложений на основе Java. Для этого введем в терминале:
$ sudo apt install default-jdk
Установка завершена.Убедимся в том, что компонент JDK установлен с помощью определения версии компилятора javac:
$ javac -version
Итак, все хорошо – компонент успешно установлен.
Загрузка и размещение программы на сервере Ubuntu
Воспользуемся ссылкой на сайт apache.org для загрузки кода Apache Kafka на наш сервер Наберем в терминале:
$ wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
Файл с кодом получен. Распаковываем его с помощью следующей команды:
$ tar -xzf kafka_2.13-2.7.0.tgz
Перейдем в каталог программы:
$ cd kafka_2.13-2.7.0
Конфигурирование сервера Apache Kafka
Конфигурирование любого сервера, VPS, или другого является неотъемлемым условием сбалансированной работы всей системы. В данном случае это включает установку нужных значений параметров в конфигурационном файлеserver.properties и создание необходимых unit-файлів для подсистемы управления службами systemd.
Сначала внесем изменения в конфигурационный файл. Изменения будут касаться двух опций – включения возможности удаления топиков или тем (параметр delete.topic.enable), а также фиксацию местонахождения файлов журналов (параметр log.dirs).
Откроем файл с помощью редактора и внесем соответствующие изменения:
$ nano config/server.properties
Для первого параметра:
log.dirs=/home/testing_kaf/logs
Для второго параметра:
delete.topic.enable = true
Сохраним внесенные изменения и выйдем из редактора.
Следующим шагом будет создание unit-файлов модулей systemd для службы Kafka. Они обеспечивают такие действия, как запуск, остановка и перезагрузка службы. Кроме того, Kafka использует службу Apache ZooKeeper, обеспечивающую управление состояниями и конфигурациями кластера. И поэтому для нее тоже нужно будет создать unit-файл.
Создадим сначала unit-файл для ZooKeeper. Для этого с помощью редактора создадим пустой файл, в который внесем соответствующий код, приведенный ниже.
Вызов редактора:
$ sudo nano /etc/systemd/system/zookeeper.service
Код:
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=testing_kaf ExecStart=/home/testing_kaf/kafka_2.13-2.7.0/bin/zookeeper-server-start.sh /home/testing_kaf/kafka_2.13-2.7.0/config/zookeeper.properties ExecStop=/home/testing_kaf/kafka_2.13-2.7.0/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
В первом разделе указаны требования о необходимости готовности сети и файловой системы. Второй раздел указывает файлы для запуска и остановки службы. Здесь также присутствует директива перегрузки службы в случае ее некорректной работы.
После внесения соответствующих изменений и их сохранения выйдем из редактора.
После этого создадим unit-файл для службы Kafka:
$ sudo nano /etc/systemd/system/kafka.service
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=testing_kaf ExecStart=/bin/sh -c '/home/testing_kaf/kafka_2.13-2.7.0/bin/kafka-server-start.sh /home/testing_kaf/kafka_2.13-2.7.0/config/server.properties > /home/testing_kaf/kafka_2.13-2.7.0/kafka.log 2>&1' ExecStop=/home/testing_kaf/kafka_2.13-2.7.0/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
В первом разделе установлена ??связь службы с сервисом ZooKeeper. Это обеспечивает автоматический запуск сервиса при старте службы. Во втором разделе определены файлы для выполнения рабочих действий, равно как и для ZooKeeper.
Сохраним внесенные изменения и выйдем из редактора.
Запуск сервера Apache Kafka
Запустим сервер с помощью следующей команды:
$ sudo systemctl start kafka
Убедимся в том, что сервер удачно запустился. Для этого проверим журналы модуля Kafka:
$ sudo systemctl status kafka
Ниже приведен результат выполнения команды:
? kafka.service Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled) Active: active (running) since Mon 2024-01-22 16:52:14 UTC; 23s ago Main PID: 574146 (sh) Tasks: 70 (limit: 2163) Memory: 337.3M CPU: 6.008s CGroup: /system.slice/kafka.service +-574146 /bin/sh -c "/home/testing_kaf/kafka_2.13-2.7.0/bin/kafka-server-start.sh /home/testing_kaf/kafka_2.13-2.7> L-574147 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX> Jan 22 16:52:14 dedicated systemd[1]: Started kafka.service.
Таким образом мы получили рабочий сервер Kafka, прослушивающий порт 9092.
В случае перезагрузки сервера необходимо принудительно запускать службы ZooKeeper и Kafka. Это можно сделать с помощью следующих команд:
$ sudo systemctl enable zookeeper $ sudo systemctl enable kafka
При этом создаются символические ссылки.
Тестирование сервера Apache Kafka
Проверим работу сервера на практике. В то же время, это будет критерием корректности его инсталляции и сделанных нами настроек.
В первую очередь, создадим топик или тему нашего канала с именем NewTopic. Для этого введем в терминале:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic NewTopic
Итак, тема создана – «Created topic NewTopic».
Теперь опубликуем первое наше сообщение (Working with Kafka program) в созданном канале:
$ echo "Working with Kafka program" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic NewTopic > /dev/null
После этого создадим получателя сообщений Kafka с помощью сценария kafka-console-consumer.sh. Здесь необходимо указать порт сервера ZooKeeper, а также имя хоста и имя темы.
Введем следующую команду:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewTopic --from-beginning
Опция --from-beginning позволяет использовать сообщения, созданные для запуска их получателя.
Можно убедиться, что на выходе команды мы получили отправленное нами сообщение – Working with Kafka program,что является признаком того, что все работает верно.
Обращаем внимание, что сервер продолжает прослушивать соответствующий порт, ожидая новых сообщений от издателя. Проверим это. Для этого запустим новое окно терминала и активируем аккаунт пользователя testing_kaf.
После этого создадим еще одно сообщение для публикации:
$ echo "Continue working with Kafka from another terminal" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic NewTopic > /dev/null
Теперь переключимся на первое окно терминала, чтобы увидеть результат. Уведомление должно загрузиться в вывод получателя.
Можно убедиться, что сообщение «Continue working with Kafka from another terminal» появилось в окне его получателя. Итак, все работает.
Чтобы остановить работу сценария, достаточно нажать клавиши CTRL+C. Результат этих действий показан ниже.
Служба сообщает, что было получено два сообщения – Processed a total of 2 messages.
Повышение уровня безопасности при работе с кластером Kafka
Как известно, вопросы безопасности при работе в сети очень важны. У нас есть возможность повысить ее уровень за счет аннулирования (на постоянной основе или временно) администраторских прав пользователя сервера Apache Kafka.
Для этого зайдем в систему под любым «не root» пользователем с правами sudo.
Сначала удалим пользователя testing_kaf из группы sudo:
$ sudo deluser testing_kaf sudo
Результат – «Removing user 'testing_kaf' from group 'sudo' ... Done», то есть действие успешно исполнено.
Теперь заблокируем пароль указанного пользователя, что делает невозможным в дальнейшем вход в систему кого-либо под этой учетной записью. Введем в терминале:
$ sudo passwd testing_kaf -l
После этого только root-пользователь или пользователь с правами sudo смогут войти в систему с помощью аккаунта testing_kaf.
Это можно будет сделать с помощью следующей команды:
$ sudo su - testing_kaf
Если же нужно будет в дальнейшем разблокировать возможность смены пароля, можно воспользоваться следующей командой:
$ sudo passwd testing_kaf -u
Следует отметить, что возможности брокера Apache Kafka не ограничиваются только теми, что мы использовали на практике, их гораздо больше. В частности, это касается обеспечения гибкого управления работой кластеров, каждый из которых может иметь несколько брокеров для более равномерного распределения нагрузки. Но это выходит за пределы нашей статьи и потому здесь не рассматривается.
Подписывайтесь на наш телеграмм-канал https://t.me/freehostua, чтобы быть в курсе новых полезных материалов.
Смотрите наш канал Youtube на https://www.youtube.com/freehostua.
Мы в чем ошиблись, или что-то пропустили?
Напишите об этом в комментариях, мы с удовольствием ответим и обсуждаем Ваши замечания и предложения.
Дата: 25.01.2024 Автор: Ровник Александр
|
|
Авторам статьи важно Ваше мнение. Будем рады его обсудить с Вами:
comments powered by Disqus