Second Brain: Crafted, Curated, Connected, Compounded on 10月02日
CDC实现与优化
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了在云环境中使用事件驱动架构时,如何通过自定义逻辑实现Change Data Capture (CDC)以最小化重复下载已存在的属性数据。作者创建了两函数:一个用于列出符合搜索条件的所有属性,另一个用于比较新属性与现有属性。通过生成包含属性ID和销售价格(归一化价格)的指纹来检测变化,从而实现高效的变更检测机制。此外,作者还介绍了如何从网站抓取相关销售价格数据,并利用Delta Lake的SQL API进行高效比较。

🔍 通过创建包含属性ID和销售价格(归一化价格)的指纹来检测属性变化,从而实现高效的变更检测机制。

📊 作者创建了两个函数:一个用于列出符合搜索条件的所有属性,另一个用于比较新属性与现有属性。

🌐 通过从网站抓取相关销售价格数据,确保每个属性ID都能获得准确的当前销售价格,支持动态价格监控。

💻 利用Delta Lake的SQL API,使用简单的SELECT语句比较新旧属性数据,实现高效的变更检测。

🔄 该机制具有可扩展性,只需将更多相关列添加到指纹中,无需修改其他代码即可扩展变更检测功能。

CDC is a powerful tool and especially in cloud environments with event-driven architectures. I used it to minimize the downloads of already downloaded properties. Besides existing open-source CDC solutions like Debezium, I implemented my own simple logic to detect the changes. Also because I have no access to the source-OLTP database where the properties are stored which you’d need.

I accomplish the CDC by creating two functions. The first one lists all properties to certain search criteria and the second one compares these properties with existing once. How am I doing that? Primarily, I create a fingerprint from each property that will tell me if the one is new or already exstinging. You might ask why I'm not using the unique property-ID? The reasons are I didn't just want to check if I have the property or not. As mentioned in the intro I also wanted to check if the seller lowered the price over time to be able to notify when the seller can't get rid of his house or flat. My fingerprint combines the property-ID and the selling price (called normalized_pricein my data). One more benefit if more columns getting relevant, I could just add them to the fingerprint and my CDC mechanism would be extended without changing any other code.

To have the relevant selling price for each property-ID I will scrape them separately from the website same as the IDs themselves. You can check that code in solid_scraping.py. The function is called list_props_immo24 which returns all properties as a data frame for my search criteria.

The logic for CDC happens in get_changed_or_new_properties in solids_spark_delta.py where I compare the existing once in my delta table with the new coming from list function above. As Delta Lake supports an SQL-API I can use plain SQL to compare the two with this simple SELECT-Statement:

123456
SELECT p.id, p.fingerprint, p.is_prefix, p.rentOrBuy, p.city, p.propertyType, p.radius, p.last_normalized_price  FROM pd_properties p  LEFT OUTER JOIN pd_existing_props e    ON p.id = e.propertyDetails_id WHERE p.fingerprint != e.fingerprint    OR e.fingerprint IS NULL

Origin: Building a Data Engineering Project in 20 Minutes, Change Data Capture (CDC)
References: Python SQL Debezium OLTP What is CDC (Change Data Capture)

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Change Data Capture CDC Data Engineering Delta Lake SQL API Cloud Environment Event-Driven Architecture
相关文章