Nothing Special   »   [go: up one dir, main page]

Skip to content

CDC with NiFi, Kafka Connect, Flink SQL, Cloudera Data in Motion

License

Notifications You must be signed in to change notification settings

tspannhw/FLaNK-CDC

Repository files navigation

FLaNK-CDC

CDC with NiFi, Kafka Connect, Flink SQL, Cloudera Data in Motion

Cat Data Capture

https://youtube.com/shorts/dBh8Aig-JVU?feature=share

Flink CDC - Debezium - Postgresql

https://github.com/tspannhw/FLaNK-CDC/blob/main/flinkcdc.MD

Kafka Connect CDC - Debezium - Postgresql

https://github.com/tspannhw/FLaNK-CDC/blob/main/kafkacdc.md

Other Resources

Flink SQL Kudu



`kudu`.`default_database`.`default.sensors`


SQL Stream Builder - Flink SQL - Insert from Kafka to Kudu

CREATE TABLE `ssb`.`ssb_default`.`iotenriched` (
  `sensor_id` BIGINT,
  `sensor_ts` BIGINT,
  `is_healthy` BIGINT,
  `response` ROW<`result` BIGINT>,
  `sensor_0` BIGINT,
  `sensor_1` BIGINT,
  `sensor_2` BIGINT,
  `sensor_3` BIGINT,
  `sensor_4` BIGINT,
  `sensor_5` BIGINT,
  `sensor_6` BIGINT,
  `sensor_7` BIGINT,
  `sensor_8` BIGINT,
  `sensor_9` BIGINT,
  `sensor_10` BIGINT,
  `sensor_11` BIGINT,
  `eventTimestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
  WATERMARK FOR `eventTimestamp` AS `eventTimestamp` - INTERVAL '3' SECOND
) WITH (
  'deserialization.failure.policy' = 'ignore_and_log',
  'properties.request.timeout.ms' = '120000',
  'format' = 'json',
  'properties.bootstrap.servers' = 'cdp.52.23.78.5.nip.io:9092',
  'connector' = 'kafka',
  'properties.transaction.timeout.ms' = '900000',
  'topic' = 'iot_enriched',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'iotenrichedssbreader1'
)



insert into `kudu`.`default_database`.`default.sensors`
select cast(iotenriched.sensor_id as int) as sensor_id, 
       iotenriched.sensor_ts, 
       cast(iotenriched.sensor_0 as DOUBLE) as sensor_0, 
       cast(iotenriched.sensor_1 as DOUBLE) as sensor_1, 
       cast(iotenriched.sensor_2 as DOUBLE) as sensor_2,  
       cast(iotenriched.sensor_3 as DOUBLE) as sensor_3, 
       cast(iotenriched.sensor_4 as DOUBLE) as sensor_4,  
       cast(iotenriched.sensor_5 as DOUBLE) as sensor_5,  
       cast(iotenriched.sensor_6 as DOUBLE) as sensor_6, 
       cast(iotenriched.sensor_7 as DOUBLE) as sensor_7, 
       cast(iotenriched.sensor_8 as DOUBLE) as sensor_8, 
       cast(iotenriched.sensor_9 as DOUBLE) as sensor_9, 
       cast(iotenriched.sensor_10 as DOUBLE) as sensor_10, 
       cast(iotenriched.sensor_11 as DOUBLE) as sensor_11, 
       cast(iotenriched.is_healthy as INT) as is_healthy
from iotenriched


About

CDC with NiFi, Kafka Connect, Flink SQL, Cloudera Data in Motion

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published