• База знаний
  • /
  • Блог
  • /
  • Wiki
  • /
  • ONLINE CHAT
+380 (44) 364 05 71

Статья также доступна на украинском (перейти к просмотру).

Статья по Apache Kafka. Что это, для чего используется, как установить на Ubuntu

Оглавление

Использование брокеров сообщений позволяет обеспечить быструю связь между компонентами распределенных систем, упростить масштабирование. Существует несколько типов брокеров, нацеленных на выполнение определенных задач. В частности, для систем обмена сообщениями для публикации или подписки критическим показателем является скорость обработки данных, требующая наличия брокера с соответствующими характеристиками. Apache Kafka наиболее полно отвечает указанному и некоторым другим критериям, что сделало его довольно популярным средством в указанном сегменте. Рассмотрим технические возможности и продемонстрируем работу с ним на практике.

Что такое брокер сообщений

Это программный модуль, выполняющий посреднические функции в коммуникациях между компонентами распределенных систем. Основными элементами любого брокера является издатель и их получатель или подписчик. Брокер определяет протокол взаимодействия между ними, то есть устанавливает правила, регулирующие очередность и время формирования сообщений, время их хранения на сервере и некоторые другие.

Брокеры позволяют организовать асинхронный режим обмена данными и могут быть полезны в следующих случаях:

  • Для реализации отложенного выполнения действий;
  • Сглаживание пиковых нагрузок;
  • Отправка большого количества сообщений;
  • Реализации сложной бизнес-логики распределенных систем;
  • Обеспечение персистентности данных;
  • Организации выполнения задач по расписанию.

Наиболее популярными брокерами, кроме Apache Kafka, являются RabbitMQ и Redis, каждый из которых ориентирован на определенное направление использования.

Особенности Apache Kafka

Программное средство является одним из самых быстрых среди существующих брокеров, скорость обработки сообщений составляет около 1 млн. в секунду. Кроме того, можно выделить следующие его особенности:

  • Автоматическое информирование клиентов (подписчиков) о наличии уведомлений;
  • Генерирование «неограниченного» количества сообщений;
  • Обеспечение персистентности данных – возможность обновить или прочесть сообщение позже;
  • Гарантирование четкой последовательности получения сообщений подписчиками в пределах топиков.

Указанные особенности программы позволяют ей занимать первые позиции при использовании в системах публикации или подписки, о чем уже говорилось выше.

Создание условий и программной среды для установки сервера Apache Kafka

Основными требованиями для успешной установки и работы сервера Apache Kafka есть следующие:

  • Наличие Linux-сервера под управлением ОС Ubuntu;
  • Минимум 4 Gb оперативной памяти (ОП) компьютера;
  • Учетная запись «не root» пользователя с правами sudo;
  • OpenJDK версии 11 или выше.

Нами будет использован сервер под управлением ОС Ubuntu 22.04, обычно используемый для работы VPS-серверів. Он имеет 4 Gb ОП и потому первые два требования нами выполнены. Теперь сконцентрируемся на исполнении последних двух требований.

Создание пользователя с правами sudo

Создадим пользователя с именем testing_kaf. Для этого наберем в терминале следующую команду:

$ sudo adduser testing_kaf

Создание пользователя testing_kaf

Пользователь успешно создан. Добавим его в группу пользователей с правами sudo:

$ sudo adduser testing_kaf sudo

Добавление пользователя к группе sudo

Результат – Adding user testing_kaf to group sudo Done (Пользователь testing_kaf добавлен в группу sudo).

Переключимся на созданную учетную запись, не выходя из системы:

$ su -l testing_kaf

Переключение на пользователя testing_kaf

Следовательно, пользователь testing_kaf с правами sudo является активным. Одно условие выполнено.

Развертывание OpenJDK

Программа Apache Kafka создана на языке Java и поэтому для полноценного ее функционирования необходимо наличие виртуальной машины JVM. Начнем процесс ее создания в нашей среде.

Для начала обновим индекс программных пакетов Ubuntu:

$ sudo apt update

Обновление пакетов

Проверим имеющиеся в репозитории версии Java:

$ java -version

Проврка версии Java

ОВыберем версию 2:1.11-72build2 и начнем процесс инсталляции. Набираем в терминале команду для установки среды выполнения JRE по умолчанию. Она установит JRE із OpenJDK 11:

$ sudo apt install default-jre

Установка JRE

Соглашаемся с выделением дополнительных 366 Mb дискового пространства, введя кнопку Y. После этого инициируется процесс загрузки пакетов.

Процесс инсталации JRE

Процесс инсталяции JRE

Проверим корректность инсталляции JRE с помощью следующей команды:

$ java -version

Проверка корректности инсталяции

Итак, JRE установлено. Теперь установим Java Development Kit (JDK), необходимое для компиляции и запуска определенных типов приложений на основе Java. Для этого введем в терминале:

$ sudo apt install default-jdk

 Установка JDK

Процесс инсталяции JDK

Установка завершена.Убедимся в том, что компонент JDK установлен с помощью определения версии компилятора javac:

$ javac -version

Проверка версии компилятора javac

Итак, все хорошо – компонент успешно установлен.

Загрузка и размещение программы на сервере Ubuntu

Воспользуемся ссылкой на сайт apache.org для загрузки кода Apache Kafka на наш сервер Наберем в терминале:

$ wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz

Загрузка архива с Apache Kafka

Файл с кодом получен. Распаковываем его с помощью следующей команды:

$ tar -xzf kafka_2.13-2.7.0.tgz

Распаковка архіва

Перейдем в каталог программы:

$ cd kafka_2.13-2.7.0

Переход в каталог программы

Конфигурирование сервера Apache Kafka

Конфигурирование любого сервера, VPS, или другого является неотъемлемым условием сбалансированной работы всей системы. В данном случае это включает установку нужных значений параметров в конфигурационном файлеserver.properties и создание необходимых unit-файлів для подсистемы управления службами systemd.

Сначала внесем изменения в конфигурационный файл. Изменения будут касаться двух опций – включения возможности удаления топиков или тем (параметр delete.topic.enable), а также фиксацию местонахождения файлов журналов (параметр log.dirs).

Откроем файл с помощью редактора и внесем соответствующие изменения:

$ nano config/server.properties

Для первого параметра:

log.dirs=/home/testing_kaf/logs

Редактирование файла server.properties через nano

Для второго параметра:

delete.topic.enable = true

Редактирование файла server.properties через nano

Сохраним внесенные изменения и выйдем из редактора.

Следующим шагом будет создание unit-файлов модулей systemd для службы Kafka. Они обеспечивают такие действия, как запуск, остановка и перезагрузка службы. Кроме того, Kafka использует службу Apache ZooKeeper, обеспечивающую управление состояниями и конфигурациями кластера. И поэтому для нее тоже нужно будет создать unit-файл.

Создадим сначала unit-файл для ZooKeeper. Для этого с помощью редактора создадим пустой файл, в который внесем соответствующий код, приведенный ниже.

Вызов редактора:

$ sudo nano /etc/systemd/system/zookeeper.service

Код:

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
 
[Service]
Type=simple
User=testing_kaf
ExecStart=/home/testing_kaf/kafka_2.13-2.7.0/bin/zookeeper-server-start.sh /home/testing_kaf/kafka_2.13-2.7.0/config/zookeeper.properties
ExecStop=/home/testing_kaf/kafka_2.13-2.7.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal
 
[Install]
WantedBy=multi-user.target

В первом разделе указаны требования о необходимости готовности сети и файловой системы. Второй раздел указывает файлы для запуска и остановки службы. Здесь также присутствует директива перегрузки службы в случае ее некорректной работы.

Файл zookeeper.service

После внесения соответствующих изменений и их сохранения выйдем из редактора.

После этого создадим unit-файл для службы Kafka:

$ sudo nano /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
 
[Service]
Type=simple
User=testing_kaf
ExecStart=/bin/sh -c '/home/testing_kaf/kafka_2.13-2.7.0/bin/kafka-server-start.sh /home/testing_kaf/kafka_2.13-2.7.0/config/server.properties > /home/testing_kaf/kafka_2.13-2.7.0/kafka.log 2>&1'
ExecStop=/home/testing_kaf/kafka_2.13-2.7.0/bin/kafka-server-stop.sh
Restart=on-abnormal
 
[Install]
WantedBy=multi-user.target

В первом разделе установлена ??связь службы с сервисом ZooKeeper. Это обеспечивает автоматический запуск сервиса при старте службы. Во втором разделе определены файлы для выполнения рабочих действий, равно как и для ZooKeeper.

Файл kafka.service

Сохраним внесенные изменения и выйдем из редактора.

Запуск сервера Apache Kafka

Запустим сервер с помощью следующей команды:

$ sudo systemctl start kafka

Запуск сервера Kafka

Убедимся в том, что сервер удачно запустился. Для этого проверим журналы модуля Kafka:

$ sudo systemctl status kafka

Проверка журнала модуля Kafka

Ниже приведен результат выполнения команды:

? kafka.service
 
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: active (running) since Mon 2024-01-22 16:52:14 UTC; 23s ago
     Main PID: 574146 (sh)
     Tasks: 70 (limit: 2163)
     Memory: 337.3M
     CPU: 6.008s
     CGroup: /system.slice/kafka.service
             +-574146 /bin/sh -c "/home/testing_kaf/kafka_2.13-2.7.0/bin/kafka-server-start.sh /home/testing_kaf/kafka_2.13-2.7>
             L-574147 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX>
 Jan 22 16:52:14 dedicated systemd[1]: Started kafka.service.

Таким образом мы получили рабочий сервер Kafka, прослушивающий порт 9092.

В случае перезагрузки сервера необходимо принудительно запускать службы ZooKeeper и Kafka. Это можно сделать с помощью следующих команд:

$ sudo systemctl enable zookeeper
$ sudo systemctl enable kafka

При этом создаются символические ссылки.

Тестирование сервера Apache Kafka

Проверим работу сервера на практике. В то же время, это будет критерием корректности его инсталляции и сделанных нами настроек.

В первую очередь, создадим топик или тему нашего канала с именем NewTopic. Для этого введем в терминале:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic NewTopic

Создание топика

Итак, тема создана – «Created topic NewTopic».

Теперь опубликуем первое наше сообщение (Working with Kafka program) в созданном канале:

$ echo "Working with Kafka program" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic NewTopic > /dev/null

Публикация сообщения в топике

После этого создадим получателя сообщений Kafka с помощью сценария kafka-console-consumer.sh. Здесь необходимо указать порт сервера ZooKeeper, а также имя хоста и имя темы.

Введем следующую команду:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewTopic --from-beginning

Опция --from-beginning позволяет использовать сообщения, созданные для запуска их получателя.

Работа опции --from-beginning

Можно убедиться, что на выходе команды мы получили отправленное нами сообщение – Working with Kafka program,что является признаком того, что все работает верно.

Обращаем внимание, что сервер продолжает прослушивать соответствующий порт, ожидая новых сообщений от издателя. Проверим это. Для этого запустим новое окно терминала и активируем аккаунт пользователя testing_kaf.

После этого создадим еще одно сообщение для публикации:

$ echo "Continue working with Kafka from another terminal" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic NewTopic > /dev/null

Сообщение в новом окне

Теперь переключимся на первое окно терминала, чтобы увидеть результат. Уведомление должно загрузиться в вывод получателя.

Вивод сообщения в первом окне

Можно убедиться, что сообщение «Continue working with Kafka from another terminal» появилось в окне его получателя. Итак, все работает.

Чтобы остановить работу сценария, достаточно нажать клавиши CTRL+C. Результат этих действий показан ниже.

Остановка работи сценария

Служба сообщает, что было получено два сообщения – Processed a total of 2 messages.

Повышение уровня безопасности при работе с кластером Kafka

Как известно, вопросы безопасности при работе в сети очень важны. У нас есть возможность повысить ее уровень за счет аннулирования (на постоянной основе или временно) администраторских прав пользователя сервера Apache Kafka.

Для этого зайдем в систему под любым «не root» пользователем с правами sudo.

Сначала удалим пользователя testing_kaf из группы sudo:

$ sudo deluser testing_kaf sudo

Удаление пользователя с группы sudo

Результат – «Removing user 'testing_kaf' from group 'sudo' ... Done», то есть действие успешно исполнено.

Теперь заблокируем пароль указанного пользователя, что делает невозможным в дальнейшем вход в систему кого-либо под этой учетной записью. Введем в терминале:

$ sudo passwd testing_kaf -l

После этого только root-пользователь или пользователь с правами sudo смогут войти в систему с помощью аккаунта testing_kaf.

Это можно будет сделать с помощью следующей команды:

$ sudo su - testing_kaf

Если же нужно будет в дальнейшем разблокировать возможность смены пароля, можно воспользоваться следующей командой:

$ sudo passwd testing_kaf -u

Следует отметить, что возможности брокера Apache Kafka не ограничиваются только теми, что мы использовали на практике, их гораздо больше. В частности, это касается обеспечения гибкого управления работой кластеров, каждый из которых может иметь несколько брокеров для более равномерного распределения нагрузки. Но это выходит за пределы нашей статьи и потому здесь не рассматривается.

Подписывайтесь на наш телеграмм-канал https://t.me/freehostua, чтобы быть в курсе новых полезных материалов.

Смотрите наш канал Youtube на https://www.youtube.com/freehostua.

Мы в чем ошиблись, или что-то пропустили?

Напишите об этом в комментариях, мы с удовольствием ответим и обсуждаем Ваши замечания и предложения.

Дата: 25.01.2024
Автор: Ровник Александр
Голосование

Авторам статьи важно Ваше мнение. Будем рады его обсудить с Вами:

comments powered by Disqus
navigate
go
exit
Спасибо, что выбираете FREEhost.UA