3. Python File & Memory Handling#

1. Generator Expression - Memory-Efficient File Processing#

What is a Generator?#

  • Produces values one at a time on demand
  • Does NOT load entire data into memory
  • Uses yield keyword or generator expressions
  • Critical for processing large files (GBs)

The Problem with Loading Entire File:#

# ❌ Bad - loads ENTIRE file into memory at once
lines = open('logfile.txt').readlines()  # all lines in RAM
for line in lines:
    process(line)

# If file is 10GB → needs 10GB of RAM → crash

Generator Solution:#

# ✅ Good - reads ONE line at a time
for line in open('logfile.txt'):
    process(line)

# Only one line in memory at any time
# Works for files of ANY size

Generator Expression vs List Comprehension:#

# List comprehension - loads ALL into memory ❌
results = [process(line) for line in open('file.txt')]

# Generator expression - one at a time ✅
results = (process(line) for line in open('file.txt'))

# Difference:
# List: [...] → eager, all in memory
# Generator: (...) → lazy, one at a time

Generator with yield:#

def process_logs(filepath):
    """Generator function - yields one line at a time"""
    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            if line:               # skip empty lines
                yield parse_line(line)

# Usage - memory efficient
for entry in process_logs('access.log'):
    if entry['status'] == 404:
        print(entry['url'])

# vs loading all at once ❌
all_entries = list(process_logs('access.log'))  # defeats purpose

Practical Example - Large Log Processing:#

def process_large_log(filepath, filter_func):
    """
    Process large log file without memory issues
    ✅ Generator pattern - exam answer for large file processing
    """
    with open(filepath, 'r') as f:
        for line in f:              # reads ONE line at a time
            entry = parse_line(line.strip())
            if entry and filter_func(entry):
                yield entry

# Count POST requests - never loads full file
post_count = sum(
    1 for entry in process_large_log('access.log',
    lambda e: e['method'] == 'POST')
)

# Memory usage: constant regardless of file size ✅
# vs readlines(): proportional to file size ❌

2. readlines() - Why Avoid for Large Files#

# ❌ readlines() - loads entire file into list
lines = open('logfile').readlines()
# Returns: ['line1\n', 'line2\n', 'line3\n', ...]
# Problem: 10GB file = 10GB in RAM

# ❌ read() - even worse, single giant string
content = open('logfile').read()

# ❌ pandas for log files - loads everything
df = pd.read_csv('logfile.csv')  # all rows in RAM

# ✅ Correct alternatives:
# 1. Generator (for line in open())
# 2. pd.read_csv(chunksize=N)
# 3. gzip.open() for compressed files

Memory Comparison:#

File Size    readlines()    Generator
─────────────────────────────────────
1 MB         1 MB RAM       ~few KB
100 MB       100 MB RAM     ~few KB
1 GB         1 GB RAM       ~few KB
10 GB        CRASH          ~few KB

3. pd.read_csv(chunksize=N) - Large CSV Processing#

Basic Usage:#

import pandas as pd

# ✅ Process large CSV in chunks
for chunk in pd.read_csv('large_file.csv', chunksize=10000):
    # chunk is a regular DataFrame with 10000 rows
    process(chunk)

# Each chunk is processed and discarded before next is loaded
# Memory: only 10000 rows at a time regardless of file size

Complete Chunked Processing:#

import pandas as pd

def process_large_csv(filepath, chunksize=10000):
    """Process large CSV without memory issues"""
    results = []

    for i, chunk in enumerate(pd.read_csv(filepath, chunksize=chunksize)):
        print(f"Processing chunk {i+1}...")

        # Apply operations to chunk
        chunk['normalized'] = chunk['name'].str.lower()
        filtered = chunk[chunk['sales'] > 1000]
        results.append(filtered)

    # Combine results
    final = pd.concat(results, ignore_index=True)
    return final

# Usage
df = process_large_csv('sales_data.csv')

Aggregation Across Chunks:#

import pandas as pd

# ✅ Calculate statistics without loading all data
total_sales = 0
count = 0

for chunk in pd.read_csv('sales.csv', chunksize=10000):
    total_sales += chunk['sales'].sum()
    count += len(chunk)

average_sales = total_sales / count
print(f"Average sales: {average_sales:.2f}")

Chunked Processing with Filter:#

# Find all rows matching condition in large file
matching_rows = []

for chunk in pd.read_csv('data.csv', chunksize=10000):
    # Filter within chunk
    filtered = chunk[
        (chunk['status'] == 'active') &
        (chunk['amount'] > 1000)
    ]
    matching_rows.append(filtered)

result = pd.concat(matching_rows, ignore_index=True)

Options with read_csv:#

# Useful options for large files
pd.read_csv(
    'large.csv',
    chunksize=10000,      # rows per chunk ✅
    usecols=['a', 'b'],   # load only needed columns → saves memory
    dtype={'id': str},    # specify dtypes → prevents auto-conversion
    nrows=1000,           # load only first N rows (for testing)
    skiprows=range(1, 100), # skip rows
    encoding='utf-8',     # specify encoding
    low_memory=False      # better dtype inference
)

4. gzip.open() - Reading Compressed Files#

Why Gzipped Files?#

  • Log files are often compressed to save disk space
  • .gz extension = gzip compressed
  • Python can read them directly without manual decompression

Basic Usage:#

import gzip

# ✅ Read gzipped text file
with gzip.open('logfile.gz', 'rt') as f:  # 'rt' = read text
    for line in f:                         # generator pattern
        process(line)

# Read entire compressed file (small files only)
with gzip.open('logfile.gz', 'rt') as f:
    content = f.read()

# Write gzipped file
with gzip.open('output.gz', 'wt') as f:
    f.write("compressed content")

Mode Options:#

gzip.open('file.gz', 'rt')   # read text ✅ most common
gzip.open('file.gz', 'rb')   # read binary
gzip.open('file.gz', 'wt')   # write text
gzip.open('file.gz', 'wb')   # write binary
gzip.open('file.gz', 'at')   # append text

Complete Log Analysis - Gzipped Apache Log:#

import gzip
from datetime import datetime

def analyze_gzipped_log(filepath):
    """
    Analyze gzipped Apache log file
    Uses generator pattern for memory efficiency
    """
    post_count = 0
    status_counts = {}

    with gzip.open(filepath, 'rt') as f:
        for line in f:                    # generator - one line at a time
            entry = parse_apache_log(line.strip())
            if entry is None:
                continue

            # Count POST requests
            if entry['method'] == 'POST':
                post_count += 1

            # Count by status code
            status = entry['status']
            status_counts[status] = status_counts.get(status, 0) + 1

    return post_count, status_counts

# Run analysis
posts, statuses = analyze_gzipped_log('access.log.gz')
print(f"POST requests: {posts}")
print(f"Status distribution: {statuses}")

pandas with Gzipped CSV:#

# pandas auto-detects .gz extension
df = pd.read_csv('data.csv.gz')           # small files
for chunk in pd.read_csv('data.csv.gz', chunksize=10000):  # large files
    process(chunk)

5. with open() - Context Manager#

Why Use Context Manager?#

  • Automatically closes file when done
  • Works even if exception occurs
  • Prevents resource leaks
# ✅ Always use context manager
with open('file.txt', 'r') as f:
    content = f.read()
# File automatically closed here, even if error occurs

# ❌ Bad - manual close, risky
f = open('file.txt', 'r')
content = f.read()
f.close()  # might not run if error occurs above

File Modes:#

open('file.txt', 'r')    # read text (default)
open('file.txt', 'w')    # write text (creates/overwrites)
open('file.txt', 'a')    # append text
open('file.txt', 'x')    # exclusive creation (fails if exists)
open('file.txt', 'rb')   # read binary
open('file.txt', 'wb')   # write binary
open('file.txt', 'r+')   # read and write

Reading Options:#

with open('file.txt', 'r') as f:
    # Read entire file
    content = f.read()          # single string

    # Read all lines into list
    lines = f.readlines()       # ['line1\n', 'line2\n']

    # Read one line
    line = f.readline()         # 'line1\n'

    # ✅ Iterate line by line (memory efficient)
    for line in f:
        process(line.strip())   # strip removes \n

Writing Options:#

with open('output.txt', 'w') as f:
    f.write("Hello World\n")        # write string
    f.writelines(["line1\n", "line2\n"])  # write list

# Append to existing file
with open('output.txt', 'a') as f:
    f.write("New line\n")

Encoding:#

# Always specify encoding for non-ASCII content
with open('file.txt', 'r', encoding='utf-8') as f:
    content = f.read()

with open('file.txt', 'w', encoding='utf-8') as f:
    f.write("Content with unicode: é, ü, 中文")

6. os.path - File Path Operations#

import os

# Check existence
os.path.exists('file.txt')        # True if exists (file or dir)
os.path.isfile('file.txt')        # True if file
os.path.isdir('folder/')          # True if directory

# Path manipulation
os.path.join('folder', 'file.txt')   # 'folder/file.txt' (OS-safe)
os.path.basename('/path/to/file.txt') # 'file.txt'
os.path.dirname('/path/to/file.txt')  # '/path/to'
os.path.splitext('file.txt')          # ('file', '.txt')
os.path.abspath('file.txt')           # absolute path

# File info
os.path.getsize('file.txt')           # size in bytes
os.path.getmtime('file.txt')          # last modified time

# Directory operations
os.listdir('.')                       # list directory
os.makedirs('path/to/dir', exist_ok=True)  # create dirs
os.remove('file.txt')                 # delete file
os.rename('old.txt', 'new.txt')       # rename file

# Environment
os.getcwd()                           # current working directory
os.chdir('/path/to/dir')              # change directory
os.environ.get('API_KEY')             # get environment variable
os.getenv('API_KEY', 'default')       # with default value

7. pathlib.Path - Modern File Path Handling#

Why pathlib?#

  • More readable and Pythonic than os.path
  • Works consistently across Windows/Mac/Linux
  • Object-oriented approach
from pathlib import Path

# Create path object
p = Path('data/file.txt')
p = Path('/absolute/path/file.txt')

# Check existence
p.exists()           # True/False
p.is_file()          # True if file
p.is_dir()           # True if directory

# Path components
p.name               # 'file.txt'
p.stem               # 'file'
p.suffix             # '.txt'
p.parent             # Path('data')
p.parts              # ('data', 'file.txt')

# Build paths (OS-safe)
base = Path('data')
full = base / 'subfolder' / 'file.txt'  # ✅ clean syntax

# File operations
p.read_text()              # read entire file as string
p.write_text("content")    # write string to file
p.read_bytes()             # read as bytes
p.write_bytes(data)        # write bytes

# Create directories
Path('new/nested/dir').mkdir(parents=True, exist_ok=True)

# List directory
for file in Path('data').iterdir():
    print(file)

# Find files by pattern
for csv_file in Path('.').glob('**/*.csv'):  # recursive
    print(csv_file)

for log_file in Path('logs').glob('*.log'):  # non-recursive
    print(log_file)

# Delete
p.unlink()                 # delete file
p.rmdir()                  # delete empty directory

# Rename/move
p.rename('new_name.txt')
p.replace('new_location/file.txt')

8. tempfile - Temporary Files#

Why Temporary Files?#

  • Safe place to write data during processing
  • Automatically cleaned up
  • Essential for idempotent ETL pipelines
import tempfile
import os
import shutil

# Create temporary file
with tempfile.NamedTemporaryFile(
    mode='w',
    suffix='.csv',
    delete=False          # don't delete on close
) as tmp:
    tmp.write("data,goes,here\n")
    tmp_path = tmp.name   # save path for later

# File persists after 'with' block (because delete=False)
print(f"Temp file at: {tmp_path}")

# ✅ Idempotent ETL pattern:
def safe_write_output(data, final_path):
    """Write to temp first, then atomic move to final"""

    # Step 1: Write to temp file
    tmp = tempfile.NamedTemporaryFile(
        mode='w',
        suffix='.csv',
        delete=False
    )
    try:
        for row in data:
            tmp.write(row + '\n')
        tmp.close()

        # Step 2: Atomic move to final destination
        shutil.move(tmp.name, final_path)  # ✅ atomic
        print(f"Output written to: {final_path}")

    except Exception as e:
        # Cleanup temp file on error
        tmp.close()
        os.unlink(tmp.name)
        raise e

# Create temp directory
with tempfile.TemporaryDirectory() as tmpdir:
    # Work inside temp directory
    work_file = os.path.join(tmpdir, 'work.csv')
    # Automatically deleted when 'with' block exits

9. shutil - File Operations#

import shutil

# Copy file
shutil.copy('source.txt', 'dest.txt')        # copy file
shutil.copy2('source.txt', 'dest.txt')       # copy with metadata
shutil.copytree('source_dir/', 'dest_dir/')  # copy directory

# Move file (ATOMIC on same filesystem) ✅
shutil.move('temp_output.csv', 'final_output.csv')

# Delete
shutil.rmtree('directory/')   # delete directory and contents

# Archive
shutil.make_archive('output', 'zip', 'folder/')  # create zip
shutil.unpack_archive('archive.zip', 'dest/')    # extract

# Disk usage
shutil.disk_usage('/')        # total, used, free bytes

Why shutil.move() is Atomic:#

# ✅ Atomic move - all or nothing
shutil.move('temp.csv', 'final.csv')
# If crash occurs DURING move:
# → temp.csv may be deleted
# → final.csv still has OLD content
# → No partial/corrupt file

# ❌ Non-atomic write
with open('final.csv', 'w') as f:
    for row in data:
        f.write(row)    # crash here = partial/corrupt file

Memory Management - Complete Reference#

Python Memory Concepts:#

import sys

# Check object size in bytes
sys.getsizeof([1, 2, 3])          # size of list
sys.getsizeof("hello")            # size of string

# Memory-efficient alternatives:
# 1. Generator vs List
gen = (x**2 for x in range(1000000))  # tiny memory
lst = [x**2 for x in range(1000000)]  # large memory

# 2. numpy arrays vs Python lists
import numpy as np
arr = np.array([1, 2, 3], dtype=np.int32)  # 4 bytes per element
lst = [1, 2, 3]                             # ~28 bytes per element

# 3. Chunked processing vs loading all
# (covered in pd.read_csv(chunksize=N))

# 4. Delete large objects when done
large_data = load_large_dataset()
process(large_data)
del large_data                    # free memory
import gc
gc.collect()                      # force garbage collection

Quick Reference - When to Use What#

File Size          Approach
───────────────────────────────────────────────
Small (<100MB)   → pd.read_csv() or open().read()
Medium (100MB-1GB) → pd.read_csv(chunksize=N)
Large (>1GB)     → Generator: for line in open()
Compressed (.gz) → gzip.open('file.gz', 'rt')
CSV compressed   → pd.read_csv('file.csv.gz')

File Operation   → Method
───────────────────────────────────────────────
Read text file   → with open('f', 'r') as f
Write safely     → tempfile + shutil.move() ✅
Check existence  → os.path.exists() or Path.exists()
Build paths      → os.path.join() or Path / 'file'
Find files       → Path.glob('**/*.csv')
Process large    → Generator expression ✅
Chunk large CSV  → pd.read_csv(chunksize=N) ✅