KafkaConnector
Se connecte à un cluster Apache Kafka pour envoyer et recevoir des messages.
Cas d'utilisation courants
- Géoenrichir des données sur un sujet Kafka.
- Agréger des données de sujets Kafka et autres flux de messages.
- L'interopérabilité avec des plateformes d'analyse Big Data
Notes
- KafkaConnector se connecte initialement à une liste de serveurs de démarrage, mais il peut se connecter à d'autres serveurs lorsqu'ils sont découverts. Il ne se connecte pas directement à l'instance de ZooKeeper.
- Le KafkaConnector permet des commits décalés automatiques ou manuels. Pour plus d'informations sur ce concept, voir "Commits et Offsets" sur https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html.
Configuration
Ports d'entrée
Ce Transformer accepte tous types d'entités.
Ports de sortie
La sortie de ce Transformer varie en fonction de l'action effectuée. Les données du message sont présentées comme des attributs d'entité.
- Après une action Réception, les entités en sortie représentent les messages reçus depuis le cluster.
- Après une action Envoi, les entités en sortie représentent les messages envoyés au cluster.
- Après une action Obtenir les métadonnées, l'entité en sortie représente le statut du cluster.
- Après une action Commit, l'entité de sortie représente la partition et le décalage engagés.
L'entité entrante ressort par ce port.
Les entités qui provoquent l'échec de l'opération ressortent par ce port. Un attribut fme_rejection_code, ayant la valeur ERROR_DURING_PROCESSING, sera ajouté, ainsi qu'un attribut fme_rejection_message plus descriptif qui contient des détails plus spécifiques sur la raison de l'échec.
Note: Si une entité entrant dans KafkaConnector possède déjà une valeur pour fme_rejection_code, cette dernière sera supprimée.
Gestion des entités rejetées : ce paramètre permet d'interrompre la traduction ou de la poursuivre lorsqu'elle rencontre une entité rejetée. Ce paramètre est disponible à la fois comme option par défaut de FME et comme paramètre de traitement.
Paramètres
Source des identifiants |
En plus de l'accès anonyme, le KafkaConnector peut s'authentifier auprès du cluster. L'utilisation d'une connexion web s'intègre mieux avec FME, mais dans certains cas, vous pouvez souhaiter utiliser l'une des autres sources.
|
Compte |
Disponible lorsque la source d'accréditation est une connexion Web. Pour créer une connexion Apache Kafka, cliquez sur la liste déroulante "Compte" et sélectionnez "Ajouter une connexion Web...". La connexion pourra ensuite être gérée via Outils -> Options FME... -> Connexions Web. |
Nom d'utilisateur et mot de passe |
Disponible lorsque la source d'accréditation est Intégrée. Un nom d'utilisateur et un mot de passe peuvent être spécifiés directement dans le Transformer au lieu d'une connexion web. |
Protocole de sécurité |
|
Mécanisme SASL |
|
Certificat SSL CA |
Lors de l'utilisation de l'un des protocoles de sécurité SSL, un chemin optionnel vers un certificat SS LA pour vérifier le cluster. |
La liste des serveurs bootstrap (hôte et port) auxquels se connecter. Voir l'option de configuration "bootstrap.servers" à l'adresse https://kafka.apache.org/documentation.html pour plus d'informations.
Action |
Le type d'opération à effectuer. Les choix possibles sont :
|
Les autres paramètres disponibles dépendent de la valeur du paramètre Requête > Action. Les paramètres de chaque action sont détaillés ci-dessous.
Sujets
Sujets |
La liste des sujets à recevoir. Les sujets peuvent être saisis manuellement, ou sélectionnés de manière interactive en cliquant sur les ellipses dans chaque ligne du tableau. |
Comportement de réception
Mode |
Deux options de réception de messages sont disponibles :
|
Maximum par seconde |
En mode Flux, cette option limite le nombre de messages qui seront reçus par seconde. Si vous laissez ce champ vide, le connecteur pourra recevoir autant de messages que possible. |
Taille des lots |
La méthode Lots spécifie le nombre de messages à lire par lot. |
Options de réception
ID de groupe utilisateur |
Le groupe de consommateurs dont fait partie le KafkaConnector. Pour plus d'informations sur les groupes de consommateurs, consultez la documentation Kafka : https://kafka.apache.org/documentation/#intro_consumers. |
Décalage de départ |
Un décalage de départ par défaut peut être spécifié :
|
Partition |
Quand Décalage de départ vaut Personnalisé, spécifie la partition pour assigner auquel assigner le connecteur. |
Décalage |
Quand le Décalage de départ vaut Personnalisé, spécifie le décalage à partir duquel recevoir. |
Commit automatique |
S'il faut automatiquement effectuer un commit des décalages de lecture
|
Sujets
Sujets |
La liste des sujets à envoyer. Les sujets peuvent être saisis manuellement, ou sélectionnés de manière interactive en cliquant sur les ellipses dans chaque ligne du tableau. |
Options d'envoi
Créer des sujets manquants |
Si Oui est sélectionné, les sujets seront automatiquement créés s'ils n'existent pas. |
Nombre de partitions |
Lors de la création de sujets, de nouveaux sujets seront créés avec ce nombre de partitions. |
Facteur de réplication |
Lors de la création de sujets, de nouveaux sujets seront créés avec ce facteur de réplication. |
La section "options de message" est disponible pour l'envoi et la réception de messages mais certaines options sont uniquement disponibles lors de l'envoi.
Clé de message |
Le contenu à définir pour la clé de message. La clé de message est généralement utilisée pour le partitionnement et le compactage des journaux. Disponible uniquement pour l'envoi. |
Valeur du message |
Les données réelles du message. Il doit s'agir d'une chaîne ou d'une valeur binaire. Uniquement disponible pour l'envoi. |
Schéma de message |
Le type de schéma de message peut être soit :
Pour plus d'informations sur les schémas Avro, voir https://avro.apache.org/docs/current/spec.html. |
Encodage |
Pour l'envoi, l'encodage à utiliser pour convertir la valeur d'entrée du texte en binaire. Les attributs numériques seront convertis en une représentation textuelle du nombre avant l'encodage. Si fme-binary est sélectionné, les chaînes seront encodées en utf-8, les attributs binaires seront envoyés tels quels, et les autres types d'attributs ne seront pas pris en charge. Pour la réception, les attributs _key et _value contiennent par défaut les octets bruts reçus du serveur. Si un encodage est sélectionné, le connecteur tentera plutôt de décoder la valeur du message reçu en une chaîne de caractères en utilisant l'encodage sélectionné. Disponible quand Simple est sélectionné pour le schéma de message. |
Registre de schéma |
En cas d'utilisation du schéma Avro, l'URL du registre de schémas. Voir https://docs.confluent.io/current/schema-registry/index.html pour plus d'informations sur la gestion des schémas en utilisant Confluent Schema Registry. |
Schéma clé |
Le schéma Avro basé sur JSON à utiliser pour la clé du message. Disponible uniquement pour l'envoi. |
Schéma de valeur |
Le schéma Avro basé sur JSON à utiliser pour la valeur du message. Disponible uniquement pour l'envoi. |
Options de métadonnées
L'obtention de métadonnées ne produit qu'un seul attribut de sortie : _metadata . Cet attribut contient des métadonnées au format JSON. Le format est une agrégation des métadonnées de cluster, de courtier, de sujet et de partition.
{
"cluster_id": < cluster ID > ,
"controller_broker_id": < controller broker ID > ,
"brokers": [
{
"id": < broker ID > ,
"host": < host > ,
"port": < port >
},
...
],
"topics": [
{
"name": < topic name > ,
"partitions": [
{
"id": < partition ID > ,
"leader_broker_id": < leader broker ID > ,
"replica_broker_ids": [< replica broker ID >, ...],
"in_sync_replicas": [< in-sync replica ID >, ...],
},
...
],
},
...
],
}
Topic
Topic |
Le sujet pour lequel il faut définir le décalage de commit. |
Options d'envoi
Identifiant du connecteur de réception |
Ce paramètre doit être défini comme la valeur de l'attribut _connector_id produit par le KafkaConnector en lecture. |
Décalage |
Le décalage pour lequel effectuer un commit, généralement défini à partir de l'attribut _offset associé au message. |
Partition |
La partition pour laquelle effectuer un commit , généralement définie à partir de l'attribut _partition associé au message. |
Pour toutes les actions, il est possible de fournir des options avancées supplémentaires. Pour l'envoi et la réception, ces options sont appelées respectivement producteur et consommateur.
Les options sont fournies sous forme d'objet JSON et correspondent aux options documentées à l'adresse https://docs.confluent.io/current/clients/librdkafka/CONFIGURATION_8md.html. Notez que toutes les options ne sont pas disponibles pour être utilisées dans le connecteur.
Exemples
{
"socket.timeout.ms": 100,
"check.crcs": true
}
Si des options fournies par l'utilisateur entrent en conflit avec celles générées en interne par le connecteur, les options fournies par l'utilisateur seront utilisées. Par exemple, FME définit généralement "bootstrap.servers" à partir du paramètre Brokers, mais si les options avancées contiennent également "bootstrap.servers", cette valeur sera utilisée à la place.
Les attributs suivants peuvent être sélectionnés pour être inclus dans les entités de sortie. Chaque entité de sortie représente un message qui a été soit envoyé, soit reçu.
_key |
La clé du message |
_value |
La valeur du message |
_len |
La longueur du message en octets |
_offset |
Le décalage de la partition |
_timestamp |
L'horodatage du message provenant du cluster. Selon la configuration du serveur, cet horodatage peut avoir des significations légèrement différentes. |
_topic |
Le sujet du message reçu ou envoyé |
_partition |
La partition dans laquelle le message est stockée. |
_headers{}.name |
Attributs de liste structurée contenant les clés et les valeurs d'en-tête d'un message reçu. Notez que les en-têtes des messages envoyés ne sont pas disponibles. |
Éditer les paramètres des Transformers
À l'aide d'un ensemble d'options de menu, les paramètres du Transformer peuvent être attribués en faisant référence à d'autres éléments de traitement. Des fonctions plus avancées, telles qu'un éditeur avancé et un éditeur arithmétique, sont également disponibles dans certains Transformers. Pour accéder à un menu de ces options, cliquez sur à côté du paramètre applicable. Pour plus d'informations, voir Options de menus et paramètres de Transformer.
Définir les valeurs
Il existe plusieurs façons de définir une valeur à utiliser dans un Transformer. La plus simple est de simplement taper une valeur ou une chaîne de caractères, qui peut inclure des fonctions de différents types comme des références d'attributs, des fonctions mathématiques et de chaînes de caractères, et des paramètres de traitement. Il existe un certain nombre d'outils et de raccourcis qui peuvent aider à construire des valeurs, généralement disponibles dans le menu contextuel déroulant adjacent au champ de valeur.
Utilisation de l'éditeur de texte
L'éditeur de texte fournit un moyen efficace de construire des chaînes de textes (dont les expressions régulières) à partir de données source diverses, telles que des attributs, des paramètres et des constantes, et le résultat est directement utilisé dans le paramètre.
Utilisation de l'éditeur arithmétique
L'éditeur arithmétique fournit un moyen simple de construire des expressions mathématiques à partir de plusieurs données source, telles que des attributs et des fonctions, et le résultat est directement utilisé dans un paramètre.
Valeur conditionnelle
Définit des valeurs selon un ou plusieurs tests.
Fenêtre de définition de conditions
Contenu
Les expressions et chaînes de caractères peuvent inclure des fonctions, caractères, paramètres et plus.
Lors du paramétrage des valeurs - qu'elles soient entrées directement dans un paramètre ou construites en utilisant l'un des éditeurs - les chaînes de caractères et les expressions contenant des fonctions Chaîne de caractères, Math, Date et heure ou Entité FME auront ces fonctions évaluées. Par conséquent, les noms de ces fonctions (sous la forme @<nom_de_fonction>) ne doivent pas être utilisés comme valeurs littérales de chaîne de caractères.
Ces fonctions manipulent les chaînes de caractères. | |
Caractères spéciaux |
Un ensemble de caractères de contrôle est disponible dans l'éditeur de texte. |
Plusieurs fonctions sont disponibles dans les deux éditeurs. | |
Fonctions Date/heure | Les fonctions de dates et heures sont disponibles dans l'Editeur texte. |
Ces opérateur sont disponibles dans l'éditeur arithmétique. | |
Elles retournent des valeurs spécifiques aux entités. | |
Les paramètres FME et spécifiques au traitement peuvent être utilisés. | |
Créer et modifier un paramètre publié | Créer ses propres paramètres éditables. |
Options - Tables
Les Transformers avec des paramètres de style table possèdent des outils additionnels pour remplir et manipuler des valeurs.
Réordonner
|
Activé une fois que vous avez cliqué sur un élément de ligne. Les choix comprennent :
|
Couper, Copier et Coller
|
Activé une fois que vous avez cliqué sur un élément de ligne. Les choix comprennent :
Copier, copier et coller peuvent être utilisés au sein d'un Transformer ou entre Transfromers. |
Filtre
|
Commencez à taper une chaîne de caractères, et la matrice n'affichera que les lignes correspondant à ces caractères. Recherche dans toutes les colonnes. Cela n'affecte que l'affichage des attributs dans le Transformer - cela ne change pas les attributs qui sont sortis. |
Importer
|
Le bouton d'import remplit la table avec un jeu de nouveaux attributs lus depuis un jeu de données. L'application spécifique varie selon les Transformers. |
Réinitialiser/Rafraîchir
|
Réinitialise la table à son état initial, et peut fournir des options additionnelles pour supprimer des entrées invalides. Le comportement varie d'un Transformer à l'autre. |
Note : Tous les outils ne sont pas disponibles dans tous les Transformers.
Références
Comportement |
|
Stockage des entités |
Non |
Dépendances | Apache Kafka |
Alias | ApacheKafkaConnector |
Historique | Implémenté dans FME 2019.0 |
FME Community
FME Community est l'endroit où trouver des démos, des tutoriaux, des articles, des FAQ et bien plus encore. Obtenez des réponses à vos questions, apprenez des autres utilisateurs et suggérez, votez et commentez de nouvelles entités.
Voir tous les résultats à propos de ce Transformer sur FME Community.
Les exemples peuvent contenir des informations sous licence Open Government - Vancouver et/ou Open Government - Canada.