Input: Bhanu11032024@gmail.com Output: Valid Email Input: my.edzeq@our-site.org Output: Valid Email Input: Bhanuedzeq.com Output: Invalid Email
-----------------------------------------------
Code
-----------------------------------------------
import re
# Make a regular expression
# for validating an Email
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
# Define a function for
# for validating an Email
def check(email):
# pass the regular expression
# and the string into the fullmatch() method
if(re.fullmatch(regex, email)):
print("Valid Email")
else:
print("Invalid Email")
# Driver Code
if __name__ == '__main__':
# Enter the email
email = "Bhanu11032024@gmail.com"
# calling run function
check(email)
email = "my.edzeq@our-site.org"
check(email)
email = "Bhanuedzeq.com"
check(email)
--------------------------------------------------------------
Output
Valid Email Valid Email Invalid Email
#dataenginerr #datascience #Python #pyspark #sql #Email #validation #verification # emailaddress #reg #regularexp #regex
When dealing with situations where the number of columns in the incoming data can vary, it's essential to create a flexible and robust data processing pipeline that can handle such changes. Here are some steps you can take to handle this scenario in PySpark:
Identify the missing column: Determine which column is missing from the incoming data. You can compare the schema of the incoming data with the expected schema to find any discrepancies.
Add the missing column with default values: If a column is missing from the incoming data, you can add it to the DataFrame with default values. This can be done using the withColumn() method along with the lit() function to create a constant value column.
Proceed with data processing: Once the missing column is added with default values, you can continue processing the data as usual, ensuring your pipeline can handle the varying number of columns.
Here's a PySpark example that demonstrates how to handle missing columns:
---------------------------------------------------------------------------------------------------------------------
Code
---------------------------------------------------------------------------------------------------------------------
from pyspark.sql.functions import lit
# Load incoming data (assuming 9 columns)
incoming_data = spark.read.csv("path/to/incoming/data", header=True, inferSchema=True)# Define the expected schema (assuming 10 columns)
expected_columns = ["column1", "column2", ..., "column10"]# Find the missing column
missing_columns = set(expected_columns) - set(incoming_data.columns)# Add the missing column with default values (assuming default value is None)
for column in missing_columns:
incoming_data = incoming_data.withColumn(column, lit(None))# Proceed with data processing
---------------------------------------------------------------------------------------------------------------------
In this example, the incoming data is loaded into a DataFrame, and the missing column is identified by comparing the expected schema with the actual schema. Then, the missing column is added to the DataFrame with default values (None in this case). Once the missing column is added, you can continue processing the data as usual.
Keep in mind that this approach assumes you have a predefined expected schema. If the schema can change frequently or is not known in advance, you may need to develop a more dynamic approach to handle such changes. Additionally, you may need to handle more complex scenarios, such as data type changes or column order changes, depending on the requirements of your data processing pipeline
#pyspark #python #dataengineer #developer #data #It #software #softwarejob
Handling bad or corrupt data is an essential part of data processing pipelines. Bad data can be caused by various reasons, such as data entry errors, system glitches, or incorrect data formats. Here are some strategies for handling bad data in #PySpark:
Data validation and filtering: Perform data validation and filtering during the data ingestion process. Use PySpark's built-in functions and User-Defined Functions (#UDFs) to validate the data and filter out any bad or corrupt records.
-----------------------------------------------------------------
# Define a validation function for your data
def is_valid(record):
# Add your validation logic here
return True if record meets validation criteria else False
# Filter the DataFrame using the validation function
valid_data = dataframe.rdd.filter(is_valid).toDF()
-----------------------------------------------------------------
Schema validation: Define a schema for your data and use it when reading the data. PySpark will automatically validate the data against the schema, and you can choose how to handle the bad records.
-----------------------------------------------------------------
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema for your data
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# Read the data using the schema and set the 'mode' option to 'DROPMALFORMED' to drop bad records
dataframe = spark.read.csv("path/to/data.csv", schema=schema, mode="DROPMALFORMED")
-----------------------------------------------------------------
Use try-except blocks in UDFs: When using User-Defined Functions (UDFs) to process your data, use try-except blocks to catch and handle exceptions caused by bad data. You can either filter out the bad data or replace it with a default value.
-----------------------------------------------------------------
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def process_data(value):
try:
# Add your data processing logic here
result = value * 2
except Exception as e:
result = None # Replace bad data with a default value or raise an error
return result
process_data_udf = udf(process_data, IntegerType())
# Apply the UDF to the DataFrame
processed_data = dataframe.withColumn("processed_column", process_data_udf("input_column"))
-----------------------------------------------------------------
Log and analyze bad data: Keep track of bad data by logging it or storing it in a separate location, such as a database or file system. This can help you analyze the root causes of the bad data and take corrective actions to prevent it in the future.
Use data quality tools: Consider using data quality tools and libraries, such as Apache Griffin or Deequ, to monitor, validate, and improve data quality in your PySpark applications.
In summary, handling bad data in PySpark involves validating, filtering, and processing the data using built-in functions, UDFs, and schema validation. It's essential to track and analyze bad data to understand its causes and improve data quality in your applications.
Details: A Self Intro : 3-6 Mins 30MIns/ 24 mins 15 ETL, Agile & DWH +9 SQL
1. Proffesional Information
2. Project Details
1. Arch
2. Roles
3. Responsibilities
4. Techonlogies/tool
5. Skills
3. Hobbies.
Main Self intro:
Proffesional Details:
Myself Bhanu, I am having XX years of exp in ETL Testing, Application Testing, Database testing, DWH Testing and Report testing.
currently I am working with EdzeQ technologies.
Moving to the project details,
I am working in the ABC project and the client is XYZ.
Coming To the Project arch:
The project Dwh is builds on the Snowflake database, and the source database is Oracle/RDBMS and we have flat files as well.
This is extracting data from legacy database(Oracle) and flat files and loaded into the Snowflake database environment as a target DB.
This Snowflake DWH is build with/ contains 3 layer architecture. Those are
1. Raw : Is used to convert different data into the table format
copy the data from source system and loading the data into the Raw table.
2. Unification : is used to filter the good data and bad data seperately and loads into the unification layer.
Which is containg the good data for the business.
3. Curated/target Layer: Is used to transform unification data as per the business/ client requirement by applyimg the all transformation logics.
if data is satisfied the logics then the data will be loaded into the curated/ target layer.
Then the curated layer is integrated with the reporting tool to generate the reports for the business.
Coming to the roles and Responsibilities
I am working as Sr. ETL Tester in this project,
My Responsibilities are to make sure the business needs,
I need to gather the business requirements
and needs to create the test scenarios and test cases as per the user stories(business requirements)
Gets the peer review and gets the approval from the QA Manager and
execute the test cases and verify the executions results.
If everything looks good then closing the user story else raising the defect/bug ticket for the failure test case.
We are working in Agile methodologies.
I am participation all agile ceremonies(meeting)
I am co-ording with Team and BA to clarify the doubts
I am working with clients directly to get the updates
I am preparing DSR/WSR and sending to the clients and Manager
Coming to the Technolgies/ tools:
Jira, Zephyr, Confluence, Orcale, Snowflake, Informatica/Dbt, SnowQL
Coming my skills:
I am good at writimg SQL Queries
I am good at using Unix command
I am good at using cloud service, AWS/Azure/GCP
I am good at using Semi structured data like Json, XML