Kafka
Optimizations
- File system: XFS
- Mount option: noatime
- vm.swappinness: 1
- vm.dirty_background_ratio: <10
- vm.dirty_ratio: >20
- net.core.wmem_default: 2097152
- net.core.rmem_default: 2097152
- net.ipv4.tcp_wmem: 4096 65536 2048000
- net.ipv4.tcp_rmem: 4096 65536 2048000
- net.core.wmem_max: > net.ipv4.tcp_wmem
- net.core.rmem_max: > net.ipv4.tcp_rmem
- net.ipv4.tcp_window_scaling: 1
- net.ipv4.tcp_max_syn_backlog: > 1024
- net.core.netdev_max_backlog: > 1000
View consumer groups
<path>/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
View consumer group infoprmation
<path>/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group name>
<path>/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group name> --state
Run consumer
<path>/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic name> --from-beginning
Run Producer
<path>/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic name>
<path>/bin/kafka-console-consumer.sh --broker-list localhost:9092 --topic <topic name>
Show dirty pages
cat /proc/vmstat | egrep "dirty|writeback"
Create user
<path>/bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=<password>],SCRAM-SHA-512=[password=<password>]' --entity-type users --entity-name <user name>
Create topic
<path>/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic <topic name>
View topic
<path>/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic name>
View topics with overrides parameters
<path>/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topics-with-overrides
View partitions that are not consistent with the replica
<path>/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
View partitions with no master replica
<path>/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --unavailable-partitions
Parameters
–group <> - Rules on cunsumer group
–topic <> - Rules on topic
–consumer - Rukes on consumer
–producer - Rules on producer
–cluster - Rules on cluster
View
<path>/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic <topic name>
Add
<path>/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:<user name> --operation <operation> --topic <topic name>
Remove
<path>/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:<user name> --operation <operation> --topic <topic name>
Important It`s disabling Zero Copy.
Server
listeners=PLAINTEXT://:9092,SSL://<server name>:9093
advertised.listeners=PLAINTEXT://:9092,SSL://<server name>:9093
ssl.keystore.location=<keystore path>
ssl.keystore.password=<keystore password>
ssl.truststore.location=<truststore path>
ssl.truststore.password=<truststore password>
Client
security.protocol=SSL
ssl.truststore.location=<truststore path>
ssl.truststore.password=<truststore password>
Certification
- !Generate certification without email if use DN
- If email exist, use principal “1.2.840.113549.1.9.1=#<email in hex>,CN=,OU=,O=,L=,ST=,C=”
[req]
default_bits = 2048
default_md = sha256
distinguished_name = req_distinguished_name
prompt = no
utf8 = yes
[req_distinguished_name]
countryName = <RU/EU..>
stateOrProvinceName = <name of city>
localityName = <name of city>
organizationName = <name of organization>
organizationalUnitName = <division name>
commonName = <domain name if have not alternative names. !Required Field!>
Server
ssl.client.auth=required
ssl.principal.mapping.rules= \
RULE:^CN=([a-zA-Z0-9.]*).*$/$1/L ,\
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,\
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,\
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,\
DEFAULT
Clinet
security.protocol=SSL
ssl.keystore.location=<keystore path>
ssl.keystore.password=<keystore password>
ssl.truststore.location=<truststore path>
ssl.truststore.password=<truststore password>
Server
File kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="<user>"
password="<password>";
};
File server.propeties
listeners=SASL_SSL://<server name>:9093
advertised.listeners=SASL_SSL://<server name>:9093
sasl.enabled.mechanisms=SCRAM-SHA-512,SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
security.inter.broker.protocol=SASL_SSL
export KAFKA_OPTS="-Djava.security.auth.login.config=<path>/kafka_server_jaas.conf"
Client
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<user name>" \
password="<password>";
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
# Only before 1st start
zookeeper.set.acl=true
KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
Server
File server.propeties
sasl.enabled.mechanisms=SCRAM-SHA-512,SCRAM-SHA-256,OAUTHBEARER
listener.name.sasl_ssl.sasl.enabled.mechanisms=OAUTHBEARER,SCRAM-SHA-512,SCRAM-SHA-256
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
sasl.oauthbearer.expected.audience=account
sasl.oauthbearer.token.endpoint.url=https://<Keycloak host>:8443/realms/<realm name>/protocol/openid-connect/token
sasl.oauthbearer.jwks.endpoint.url=https://<Keycloak host>:8443/realms/<realm name>/protocol/openid-connect/certs
sasl.oauthbearer.expected.issuer=https://<Keycloak host>:8443/realms/<realm name>
Client
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://<Keycloak host>:8443/realms/<realm name>/protocol/openid-connect/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="<client id>" \
clientSecret="<client secret>";
File kafka_server_jaas.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="<user>"
password="<password>";
};
acks = all
min.insync.replicas = <number of replicas < replication.factor>