How to Build a Data Pipeline for API Integration Using Python and PostgreSQL
In today’s data-driven world, fetching, storing, and analyzing data from APIs is an essential task for data engineers and analysts. Whether you’re working with real-time weather data, social media feeds, IoT sensors, or financial APIs, an efficient data pipeline can save you hours of manual work and ensure data consistency.
In this step-by-step guide, you’ll learn how to:
- Fetch data from APIs using
requests - Transform JSON data for efficient storage
- Load the processed data into a PostgreSQL database
- Automate the entire data pipeline
- Query the data for analysis
Step 1: Setting Up Your Environment
Make sure you have Python and PostgreSQL installed on your machine. Then, install the required Python libraries:
pip install requests psycopg2 pandas
- requests: For making HTTP requests to fetch API data.
- psycopg2: PostgreSQL adapter for Python.
- pandas: For data manipulation and transformation.
Setting Up the PostgreSQL Database
Create a new database to store API data:
CREATE DATABASE api_data;
Step 2: Fetching Data from an API
Let’s use the Open-Meteo API to fetch hourly weather data.
import requests
# Define the API endpoint
api_url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 35.6895,
"longitude": 139.6917,
"hourly": "temperature_2m"
}
# Make a GET request
response = requests.get(api_url, params=params)
if response.status_code == 200:
data = response.json()
print("Data fetched successfully!")
else:
print("Failed to fetch data:", response.status_code)
response.status_code to avoid unexpected pipeline failures.Step 3: Transforming Data for Storage
APIs often return data in JSON format. Let’s convert it into a structured Pandas DataFrame.
import pandas as pd
# Extract temperature and timestamps
temperature_data = data['hourly']['temperature_2m']
timestamps = data['hourly']['time']
# Create DataFrame
df = pd.DataFrame({'timestamp': timestamps, 'temperature': temperature_data})
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
print(df.head())
Step 4: Creating a Table in PostgreSQL
Create a table to store the transformed data:
CREATE TABLE weather_data (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP,
temperature FLOAT
);
Step 5: Loading Data into PostgreSQL
Insert the DataFrame into the PostgreSQL database:
import psycopg2
# Connect to PostgreSQL
conn = psycopg2.connect(
dbname="api_data", user="your_user", password="your_password", host="localhost"
)
cur = conn.cursor()
# Insert data row by row
for _, row in df.iterrows():
cur.execute(
"INSERT INTO weather_data (timestamp, temperature) VALUES (%s, %s)",
(row['timestamp'], row['temperature'])
)
# Commit and close connection
conn.commit()
cur.close()
conn.close()
print("Data inserted into PostgreSQL successfully!")
Step 6: Automating the Data Pipeline
Automate your pipeline so that data fetching, transformation, and storage happen automatically.
Automation with Cron (Linux/Mac)
crontab -e
0 0 * * * /usr/bin/python3 /path/to/data_pipeline.py
Automation with Task Scheduler (Windows)
- Open Task Scheduler
- Create a new task and set a trigger
- Set action:
python C:\path\to\data_pipeline.py
logging module to log pipeline activity.Step 7: Querying Data for Analysis
conn = psycopg2.connect(
dbname="api_data", user="your_user", password="your_password", host="localhost"
)
cur = conn.cursor()
cur.execute("SELECT * FROM weather_data WHERE timestamp > NOW() - INTERVAL '1 day'")
rows = cur.fetchall()
for row in rows:
print(row)
cur.close()
conn.close()
Conclusion
By following this hands-on guide, you’ve learned how to build a complete data pipeline using Python and PostgreSQL. From fetching API data to transforming, storing, automating, and analyzing it, you now have the foundation to streamline your data workflows.
Comments
Post a Comment