# Kafka (Message queue service)

Apache Kafka is an open-source stream-processing software platform.

It is a framework for storing, reading and analyzing streaming data. See the [Kafka documentation](https://kafka.apache.org/documentation) for more information.

## Supported versions

You can select the major and minor version.

Patch versions are applied periodically for bug fixes and the like. When you deploy your app, you always get the latest available patches.

| Grid | Dedicated Gen 2 |
| 

   - 3.7

  None available

### Deprecated versions

The following versions are [deprecated](https://fixed.docs.upsun.com/glossary.md#deprecated-versions).
They're available, but they aren't receiving security updates from upstream and aren't guaranteed to work.
They'll be removed in the future,
so migrate to one of the [supported versions](#supported-versions).

| Grid | Dedicated Gen 2 |
| 

   - 3.6

   - 3.4

   - 3.2

   - 2.7

   - 2.6

   - 2.5

   - 2.4

   - 2.3

   - 2.2

   - 2.1

  None available

## Relationship reference

Example information available through the [`PLATFORM_RELATIONSHIPS` environment variable](https://fixed.docs.upsun.com/development/variables/use-variables.md#use-provided-variables)
or by running `platform relationships`.

Note that the information about the relationship can change when an app is redeployed or restarted or the relationship is changed.
So your apps should only rely on the `PLATFORM_RELATIONSHIPS` environment variable directly rather than hard coding any values.

```json
{
  "service": "kafka",
  "ip": "123.456.78.90",
  "hostname": "azertyuiopqsdfghjklm.kafka.service._.eu-1.platformsh.site",
  "cluster": "azertyuiop-main-7rqtwti",
  "host": "kafka.internal",
  "rel": "kafka",
  "scheme": "kafka",
  "type": "kafka:3.7",
  "port": 9092
}
```

## Usage example

### 1. Configure the service

To define the service, use the `kafka` type:

```yaml  {location=".platform/services.yaml"}
# The name of the service container. Must be unique within a project.
<SERVICE_NAME>:
  type: kafka:<VERSION>
  disk: 512
```

Note that changing the name of the service replaces it with a brand new service and all existing data is lost.
Back up your data before changing the service.

### 2. Define the relationship

To define the relationship, use the following configuration:

```yaml {}
name: myapp
# Relationships enable access from this app to a given service.
# The example below shows simplified configuration leveraging a default service
# (identified from the relationship name) and a default endpoint.
# See the Application reference for all options for defining relationships and endpoints.
relationships:
  <SERVICE_NAME>:
```

You can define ``<SERVICE_NAME>`` as you like, so long as it’s unique between all defined services
and matches in both the application and services configuration.
The example above leverages [default endpoint](https://fixed.docs.upsun.com/create-apps/image-properties/relationships.md) configuration for relationships.
That is, it uses default endpoints behind the scenes, providing a [relationship](https://fixed.docs.upsun.com/create-apps/image-properties/relationships.md)
(the network address a service is accessible from) that is identical to the name of that service.
Depending on your needs, instead of default endpoint configuration,
you can use [explicit endpoint configuration](https://fixed.docs.upsun.com/create-apps/image-properties/relationships.md).
With the above definition, the application container now has [access to the service](#use-in-app) via the relationship ``<SERVICE_NAME>`` and its corresponding [PLATFORM_RELATIONSHIPS](https://fixed.docs.upsun.com/development/variables/use-variables.md#use-provided-variables).

    .platform.app.yaml

```yaml {}
name: myapp
# Relationships enable access from this app to a given service.
# The example below shows configuration with an explicitly set service name and endpoint.
# See the Application reference for all options for defining relationships and endpoints.
# Note that legacy definition of the relationship is still supported.
# More information: https://docs.upsun.com/anchors/fixed/app/reference/relationships/
relationships:
  <RELATIONSHIP_NAME>:
    service: <SERVICE_NAME>
    endpoint: kafka
```

You can define ``<SERVICE_NAME>`` and ``<RELATIONSHIP_NAME>`` as you like, so long as it’s unique between all defined services and relationships
and matches in both the application and services configuration.
The example above leverages [explicit endpoint](https://fixed.docs.upsun.com/create-apps/image-properties/relationships.md) configuration for relationships.
Depending on your needs, instead of explicit endpoint configuration,
you can use [default endpoint configuration](https://fixed.docs.upsun.com/create-apps/image-properties/relationships.md).
With the above definition, the application container now has [access to the service](#use-in-app) via the relationship ``<RELATIONSHIP_NAME>`` and its corresponding [PLATFORM_RELATIONSHIPS](https://fixed.docs.upsun.com/development/variables/use-variables.md#use-provided-variables).

### Example configuration

### [Service definition](https://fixed.docs.upsun.com/add-services.md)

```yaml  {location=".platform/services.yaml"}
# The name of the service container. Must be unique within a project.
kafka:
  type: kafka:3.7
  disk: 512
```

#### [App configuration](https://fixed.docs.upsun.com/create-apps.md)

```yaml {}
name: myapp
# Relationships enable access from this app to a given service.
# The example below shows simplified configuration leveraging a default service
# (identified from the relationship name) and a default endpoint.
# See the Application reference for all options for defining relationships and endpoints.
relationships:
  kafka:
```

    .platform.app.yaml

```yaml {}
name: myapp
# Relationships enable access from this app to a given service.
# The example below shows configuration with an explicitly set service name and endpoint.
# See the Application reference for all options for defining relationships and endpoints.
# Note that legacy definition of the relationship is still supported.
# More information: https://docs.upsun.com/anchors/fixed/app/reference/relationships/
relationships:
  kafka:
    service: kafka
    endpoint: kafka
```

### Use in app

To use the configured service in your app, add a configuration file similar to the following to your project.

```php {}
<?php

/*
with rdkafka PHP extension installed, you can use the RdKafka\Producer class to produce messages to a Kafka topic:
applications:
  app:
    type: php:8.5
    runtime:
      extensions:
        - rdkafka
*/

declare(strict_types=1);

try {
    $host = getenv('KAFKA_HOST') ?: getenv('KAFKA_IP');
    $port = getenv('KAFKA_PORT');

    if (!$host || !$port) {
        throw new RuntimeException('Kafka environment variables not available.');
    }

    $broker = sprintf('%s:%s', $host, $port);

    $conf = new RdKafka\Conf();
    $conf->set('bootstrap.servers', $broker); // <-- important

    $producer = new RdKafka\Producer($conf);

    $topic = $producer->newTopic('test');

    $topic->produce(
        RD_KAFKA_PARTITION_UA, // let Kafka choose the partition
        0,
        'Hello, World!'
    );

    // Serve delivery reports / internal queue
    $producer->poll(0);

    // Wait for delivery
    $timeoutMs = 10000;
    $start = microtime(true);

    while ($producer->getOutQLen() > 0) {
        $producer->poll(100);

        if ((microtime(true) - $start) * 1000 > $timeoutMs) {
            break;
        }
    }

    $result = $producer->flush($timeoutMs);

    if ($result !== RD_KAFKA_RESP_ERR_NO_ERROR) {
        throw new RuntimeException('Unable to flush messages (not delivered within timeout).');
    }

    echo "Message delivered.\n";
} catch (\Throwable $e) {
    print $e->getMessage();
}

```
```python {}
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config

def usage_example():
    # Create a new Config object to ease reading the Upsun Fixed environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')

    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])

        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)

        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )

        consumer.subscribe(['numtest'])

        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output

    except Exception as e:
        return e

```
```ruby {}

## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

```

The specific way to inject configuration into your application varies.
Consult your application or framework's documentation.

