17.2. 以编程方式使用Kafka数据存储区

17.2.1. 创建数据存储

假设GeoMesa代码位于类路径上,则可以通过普通的GeoTools发现方法获得Kafka数据存储的实例。要创建 KafkaDataStore 有两个必需的属性,一个用于ApacheKafka连接, kafka.brokers ,还有一个用于与ApacheZooKeeper连接, kafka.zookeepers 。一个可选参数, kafka.zk.path 用于在ZooKeeper中指定存储架构的路径。如果未指定ZK路径,则将使用默认路径。配置参数将在下面完整描述。

import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;

Map<String, Serializable> parameters = new HashMap<>();
parameters.put("kafka.brokers", "localhost:9092");
parameters.put("kafka.zookeepers", "localhost:2181");
DataStore dataStore = DataStoreFinder.getDataStore(parameters);

17.2.2. 卡夫卡数据存储参数

Kafka数据存储与大多数数据存储的不同之处在于,数据集完全保存在内存中。因此,可以在运行时通过数据存储参数配置内存中索引。看见 卡夫卡索引配置 有关可用索引选项的详细信息,请参阅。

因为配置选项可以引用来自特定SimpleFeatureType的属性,所以在处理多个模式时可能需要创建多个Kafka数据存储实例。

Kafka数据存储接受以下参数(必需的参数用标记 * ):

参数

类型

描述

kafka.brokers *

细绳

Kafka Brokers,例如“Localhost:9092”

kafka.zookeepers

细绳

Kafka ZooKeepers,例如“Localhost:2181”,用来在ZooKeeper而不是Kafka主题中持久化GeoMesa元数据。看见 无动物园饲养员使用 了解更多细节。

kafka.catalog.topic

细绳

用于存储模式元数据的Kafka主题(不使用ZooKeeper时)

kafka.zk.path

细绳

ZooKeeper可发现路径,可用于命名要素类型(使用ZooKeeper时)

kafka.producer.config

细绳

Kafka Producer的配置选项,采用Java属性格式。看见 Producer Configs

kafka.producer.clear

布尔型

在启动时发送“清除”信息。这将导致客户端忽略启动前主题中的所有数据

kafka.consumer.config

细绳

用于Kafka消费者的配置选项,采用Java属性格式。看见 Consumer Configs

kafka.consumer.read-back

细绳

在启动时,阅读在此时间范围内写入的消息(与忽略旧消息相比),例如‘1小时’。使用‘inf’阅读所有消息。如果启用,则只有在处理完所有现有消息后,才能查询功能。但是,功能监听程序仍将照常调用。看见 初始加载(重播)

kafka.consumer.count

整型

每个功能类型使用的Kafka消费者数量。设置为0以禁用消费(即仅生产者)

kafka.consumer.group-prefix

细绳

用于Kafka组ID的前缀,以更轻松地标识特定数据存储

kafka.consumer.start-on-demand

布尔型

默认行为是仅在首次请求该功能类型时才开始使用主题。如果从未查询过某些层,这可以减少负载。请注意,当将其设置为False时应小心,因为商店将立即开始从Kafka消费所有已知的功能类型,这可能需要大量的内存开销。

kafka.topic.partitions

整型

在新的卡夫卡主题中使用的分区数量

kafka.topic.replication

整型

在新的卡夫卡主题中使用的复制因子

kafka.serialization.type

细绳

用于Kafka消息的内部序列化格式。一定是其中之一 kryoavroavro-native

kafka.cache.expiry

细绳

在此延迟后使内存缓存中的功能过期,例如“10分钟”。看见 功能到期

kafka.cache.expiry.dynamic

细绳

基于CQL谓词动态终止要素。看见 功能到期

kafka.cache.event-time

细绳

根据特征数据确定过期时间,而不是消息时间。看见 功能事件时间

kafka.cache.event-time.ordering

布尔型

根据功能事件时间确定功能排序,而不是消息时间。看见 功能事件时间

kafka.index.cqengine

细绳

将基于CQEngine的属性索引用于内存中的功能缓存。看见 CQEngine索引

kafka.index.resolution.x

整型

空间索引的x维中的条柱数量,默认为360。看见 空间索引分辨率

kafka.index.resolution.y

整型

空间索引y维中的条柱数量,默认为180。看见 空间索引分辨率

kafka.index.tiers

细绳

用于索引具有范围的几何的层数和大小,格式为 x1:y1,x2:y2 。看见 空间索引分层

kafka.serialization.lazy

布尔型

使用延迟反序列化功能。这可能会改善处理负载,但代价是查询时间稍慢

kafka.layer.views

细绳

要公开为层的现有架构的其他视图。看见 层视图 有关详情

kafka.metrics.reporters

细绳

记者过去常常发布卡夫卡指标,称为TypeSafe配置。要使用多个记者,请将他们嵌套在密钥下 reporters 。看见 GeoMesa指标 有关详情

geomesa.query.loose-bounding-box

布尔型

使用松散的边界框,这样可以提高性能,但不准确

geomesa.query.audit

布尔型

审核传入的查询。默认情况下,审核将写入日志文件

geomesa.security.auths

细绳

用于查询数据的默认授权,以逗号分隔

有关使用GeoTool的更多信息,请参阅 GeoTools user guide