Skip to main content

Access the Kafka Cluster by APISIX Gateway

· 5 min read

This blog shows how to use Apache APISIX to develop a customize authorization plugin for the kafka cluster.

Prerequisites

Expose the Kafka Cluster by KafkaBridge

To simplify the configuration setting for the kafka. I provision the kafka by strimzi-kafka-operator. In order to make Kafka expose interfaces externally like other services, I use KafkaBridge to transform it into an HTTP service.

  • Create the KafkaBridge
# namespace
KAFKA_NAMESPACE=kafka

# create kafka bridge instance
cat <<EOF | oc apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-kafka-bridge
namespace: ${KAFKA_NAMESPACE}
spec:
bootstrapServers: kafka-kafka-bootstrap.${KAFKA_NAMESPACE}.svc:9092
http:
port: 8080
replicas: 1
EOF
  • Verification
KAFKA_NAMESPACE=kafka
# forward 8080 by bridge pod
kubectl -n ${KAFKA_NAMESPACE} port-forward $(kubectl get pods -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080

# or forward 8080 by svc
kubectl -n ${KAFKA_NAMESPACE} port-forward svc/$(kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n ${KAFKA_NAMESPACE} -o jsonpath="{.items[0].metadata.name}") 8080:8080

# list topic
curl http://localhost:8080/topics

# consume message with the consumer
while true; do curl -X GET http://localhost:8080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \
-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done

Running APISIX on Openshift

  • Install APISIX on ROSA
oc create sa apisix-sa -n apisix
oc adm policy add-scc-to-user anyuid -z apisix-sa -n apisix

helm install apisix apisix/apisix \
--set gateway.type=NodePort \
--set etcd.podSecurityContext.enabled=false \
--set etcd.containerSecurityContext.enabled=false \
--set serviceAccount.name=apisix-sa \
--namespace apisix
  • Configure the Kafka Route with Admin API
# forward 9180 port to local host
kubectl -n apisix port-forward $(kubectl get pods -l app.kubernetes.io/name=apisix -n apisix -o jsonpath="{.items[0].metadata.name}") 9180:9180

# the bridge service name can be accessed by
# kubectl get svc -l strimzi.io/cluster=strimzi-kafka-bridge -n $KAFKA_NAMESPACE -o jsonpath="{.items[0].metadata.name}"
curl "http://127.0.0.1:9180/apisix/admin/routes/1" \
-H "X-API-KEY: edd1c9f034335f136f87ad84b625c8f1" -X PUT -d '
{
"methods": ["GET", "POST", "DELETE", "PUT"],
"host": "example.com",
"uri": "/*",
"plugins": {
"ext-plugin-post-resp": {
"conf": [
{"name":"my-response-rewrite", "value":"{\"tag\":\"\"}"}
]
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"strimzi-kafka-bridge-bridge-service.kafka.svc:8080": 1
}
}
}'
  • Request the Kafka Service with Client API
# forward the http api of apisix to local host
kubectl -n apisix port-forward $(kubectl get pods -l app.kubernetes.io/name=apisix -n apisix -o jsonpath="{.items[0].metadata.name}") 9080:9080

# list topic
curl --verbose --header "Host: example.com" http://localhost:9080/topics

# send message to the topic
curl --header "Host: example.com" --location 'http://localhost:9080/topics/event' -H 'Content-Type: application/vnd.kafka.json.v2+json' --data \
'{
"records":[
{
"key":"event5",
"value": "hello5"
},
{
"key":"event6",
"value": "world6"
}
]
}'

# create a kafka consumer in a new consumer group
curl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group \
-H 'content-type: application/vnd.kafka.v2+json' \
-d '{
"name": "strimzi-kafka-consumer",
"auto.offset.reset": "earliest",
"format": "json",
"enable.auto.commit": true,
"fetch.min.bytes": 512,
"consumer.request.timeout.ms": 30000
}'

# subscribe to the topic
curl --header "Host: example.com" -X POST http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/subscription \
-H 'content-type: application/vnd.kafka.v2+json' \
-d '{
"topics": [
"event"
]
}'

# consume message with the consumer
while true; do curl --header "Host: example.com" -X GET http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records \
-H 'accept: application/vnd.kafka.json.v2+json'; sleep 1; done

Develop an Authentication Plugin with Golang

  • Develop a validation plugin for the certificates

    I develop the plugin leverage the Go plugin runner. The plugin is just read the certificate from the header and then validate it. You can visit this for more detail.

  • Build the APISIX Image with the above Plugin

git clone git@github.com:apache/apisix-go-plugin-runner.git
# develop the plugin
...
# build binary
make build
# create Dockerfile to add the build binary
`Dockerfile
FROM apache/apisix:3.6.0-debian
COPY ./go-runner /usr/local/apisix/apisix-go-plugin-runner/go-runner
`
# build and push image
docker build -f ./Dockerfile -t quay.io/myan/apisix-360-go:0.1 .
docker push quay.io/myan/apisix-360-go:0.1
  • Startup the Plugin When Running the Server

    Modify the config.yaml by apisix ConfigMap.

  etcd:
host: # it's possible to define multiple etcd hosts addresses of the same etcd cluster.
- "http://apisix-etcd.apisix.svc.cluster.local:2379"
prefix: "/apisix" # configuration prefix in etcd
timeout: 30 # 30 seconds
...
# Nginx will hide all environment variables by default. So you need to declare your variable first in the conf/config.yaml
# https://github.com/apache/apisix/blob/master/docs/en/latest/external-plugin.md
nginx_config:
envs:
- APISIX_LISTEN_ADDRESS
- APISIX_CONF_EXPIRE_TIME

ext-plugin:
# path_for_test: "/tmp/runner.sock"
cmd: ["/usr/local/apisix/apisix-go-plugin-runner/go-runner", "run", "-m", "prod"]
  • Replace the APISIX Deployment Image
# image: quay.io/myan/apisix-360-go:0.1
kubectl set image deployment/apisix apisix=quay.io/myan/apisix-360-go:0.1
  • Verification
# set the certificate
CERT_CONTENT_BASE64=$(base64 < rest/client.crt)

# list the topics
curl -i 'http://127.0.0.1:9080/topics' \
-H 'Host: example.com' \
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64"

# create consumer
curl -X POST 'http://localhost:9080/consumers/strimzi-kafka-consumer-group' \
-H 'Host: example.com' \
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64" \
-d '{
"name": "strimzi-kafka-consumer",
"auto.offset.reset": "earliest",
"format": "json",
"enable.auto.commit": true,
"fetch.min.bytes": 512,
"consumer.request.timeout.ms": 30000
}'

# subscribe topic event with the consumer group 'strimzi-kafka-consumer'
curl -X POST 'http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/subscription' \
-H 'Host: example.com' \
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64" \
-d '{
"topics": ["event"]
}'

# consume message
curl -X GET 'http://localhost:9080/consumers/strimzi-kafka-consumer-group/instances/strimzi-kafka-consumer/records' \
-H 'Host: example.com' \
-H 'Accept: application/vnd.kafka.json.v2+json' \
-H 'Source: client' \
-H "Client-Certificate: $CERT_CONTENT_BASE64" \

References