MoreRSS

site iconAlex WlchanModify

I‘m a software developer, writer, and hand crafter from the UK. I’m queer and trans.
Please copy the RSS to your reader, or quickly subscribe to:

Inoreader Feedly Follow Feedbin Local Reader

Rss preview of Blog of Alex Wlchan

Watching for file changes on macOS

2026-05-11 14:41:24

When I’m working on this website, I want a local server with live reload. I want to be able to open the site in my web browser, make changes to the source files, and have my browser automatically refresh the page when the site is updated. I use this whenever I’m working on the site, and I find it helpful to see my writing in a different font/layout to my text editor; I spot lots of typos and mistakes that way.

When I was using Jekyll, I used the command jekyll serve --livereload. Now I’ve written my own static site generator, I need to build my own version. This was a fun challenge, because it touched a number of areas I’ve not worked in before – macOS filesystem events, non-blocking I/O, and HTTP long polling.

In this post I’ll explain how I detect changes to source files to trigger a rebuild; in my next post I’ll explain how that automatically refreshes any open pages in my browser. First we’re going to build a Swift script that detects changes using the FSEvents API, then we’ll get that information into a Python script.

Rejected approaches

Using third-party libraries. Initially I was using the python-livereload library, but I wanted to replace it with my own implementation – partly to remove a dependency, partly to understand how this functionality works. There are other Python libraries that offer filesystem watching, including fswatch, inotify, and watchdog, but I didn’t want to use them for similar reasons.

I have an advantage over these library authors – while they aim to support cross-platform filesystem watching, I only have to get it working on macOS. Specifically, the exact versions of macOS that my Macs are running, and no others. This means I can write a smaller, more focused bit of code.

Polling the source files. This is easy to write, but I have enough source files that it’s surprisingly slow – about 90ms to scan 13,000 source files, and I’m worried about the effect on power consumption and the lifespan of my SSD if I polled in a hot loop. For comparison, my final code only takes 2–4ms to detect a change and trigger a new build, and it’s very judicious about CPU cycles and disk reads.

The macOS FSEvents API

Setting up the event stream

There are several ways to detect changes to files on macOS; I’m going to use the File System Events API (also called “FSEvents” for short). This allows you to receive notifications about any changes to a directory tree, or files within it. One of the main purposes of this API is to allow backup software to detect incremental changes without continuously rescanning an entire tree, but we can use it for other things.

Apple has a File System Events Programming Guide which explains the FSEvents API in detail, and that it’s exactly what I need: “The file system events API is designed for passively monitoring a large tree of files for changes”. It mentions a couple of alternatives – kernel extensions for getting immediate notifications and pre-empting file changes, or kqueues for monitoring changes to a single file – but they’re not what I need, so I didn’t explore them further.

The guide is a little outdated, but the broad strokes are still correct. In Using the File System Events API, it explains the lifecycle of a file system events stream: create a stream, start listening, receive notifications, trigger a callback you provide, stop listening, release the stream.

Let’s start with a script that prints a static message whenever it sees a change – we don’t care about what file it was yet, for now we just want to know when any file changed.

Here are the steps:

  1. Create a file systems event stream using FSEventStreamCreate. This function takes a lot of arguments and you can’t use named arguments, so I found it helpful to define each argument as a variable, then pass those variables into the function. I wrapped my FSEventStreamCreate call in another function:

    import Foundation
    
    /// Create a new file system events stream that watches for changes
    /// in the given directories.
    func createFSEventStream(_ pathsToWatch: [String]) -> FSEventStreamRef {
      let callback: FSEventStreamCallback = { (_, _, _, _, _, _) in
        print("Detected file change!")
    
        // Flush stdout to ensure it's printed immediately
        fflush(stdout)
      }
    
      let context: UnsafeMutablePointer<FSEventStreamContext>? = nil
      let sinceWhen = FSEventStreamEventId(kFSEventStreamEventIdSinceNow)
      let latency = 0.01
      let flags = FSEventStreamCreateFlags()
    
      guard let eventStream = FSEventStreamCreate(
        kCFAllocatorDefault, callback, context, pathsToWatch as CFArray, sinceWhen, latency, flags
      ) else {
        fatalError("Failed to create FSEventStream: check your paths or permissions.")
      }
    
      return eventStream
    }

    The callback function is an instance of FSEventStreamCallback, which will be called whenever a file changes. The arguments contain information about the file which just changed. For now we ignore all of that information, and just print a static message.

    The context argument allows us to attach some context to the stream. I’m not sure what it’s for – perhaps for applications that have multiple event streams, and need to distinguish between them in the callback? I don’t think I need this, and the docs say I can pass NULL, so that’s what I’ve done.

    The sinceWhen argument asks for events that happened after a given event ID. I imagine this is useful for long-running applications like backup software – it means they can resume an event stream if the app is quit and relaunched, without rescanning the tree on every app launch. I just need events from when the script started running, so I can use the kFSEventStreamEventIdSinceNow constant.

    The latency argument is how long the OS will wait to coalesce rapid-fire events into a single event. A shorter latency means you get notifications faster, but you’ll get more of them. I’ll implement my own event coalescing later, so I set this quite low and accept the stream.

    The flags modify the behaviour of the event stream. We’re using the defaults for now; we’ll come back and add some more later.

    Finally, we create the event stream by using FSEventStreamCreate. This returns an Optional value which can be nil if the stream wasn’t created successfully; for example, if you try to watch a directory that doesn’t exist or which you don’t have permission to read.

  2. Choose the folders you want to watch. For this initial script, we’ll use two folders that should be present on every Mac: the user’s Desktop and Documents folder.

    let home = URL(fileURLWithPath: NSHomeDirectory())
    let pathsToWatch = [
      home.appendingPathComponent("Desktop").path,
      home.appendingPathComponent("Documents").path
    ]
  3. Schedule the event stream and start listening for changes.

    The FS Events Guide tells you to use FSEventStreamScheduleWithRunLoop, but that function has been deprecated for several years. The recommended replacement is FSEventStreamSetDispatchQueue:

    let queue = DispatchQueue(label: "net.alexwlchan.watch_file_changes")
    FSEventStreamSetDispatchQueue(eventStream, queue)
    FSEventStreamStart(eventStream)
    
    print("Listening for changes in \(pathsToWatch.joined(separator: ", "))")
    
    dispatchMain()
  4. Clean up the event stream when we’re done. In a simple script like mine that might not be necessary – the system probably cleans up an event stream if it’s not used for a while – but it’s good hygiene and ensures my Mac doesn’t start tracking dozens of redundant event streams.

    First, here’s a function to call the FSEventStream methods that stop, invalidate, and release references to the stream:

    /// Stop a file system events stream, invalidate it, and release our
    /// reference to it.
    func cleanupEventStream(_ eventStream: FSEventStreamRef) {
      FSEventStreamStop(eventStream)
      FSEventStreamInvalidate(eventStream)
      FSEventStreamRelease(eventStream)
    }

    Then, a function to create dispatch source objects that watch for a termination signal (SIGINT, SIGTERM, SIGHUP) and runs our cleanup function. We have to disable the default handlers, or they can terminate the script before we run our cleanup code:

    /// Register cleanup handlers for SIGINT, SIGTERM and SIGHUP that
    /// clean up the event stream when the script exits.
    ///
    /// Returns an array of `DispatchSourceSignal`; the caller must hold
    /// a reference to these in a global variable, or they will be cancelled.
    func registerCleanup(_ eventStream: FSEventStreamRef) -> [DispatchSourceSignal] {
      let signals = [SIGINT, SIGTERM, SIGHUP]
      var sources: [DispatchSourceSignal] = []
    
      for sig in signals {
        let signalSource = DispatchSource.makeSignalSource(signal: sig, queue: .main)
        signalSource.setEventHandler {
          print("\nStopping listener...")
          cleanupEventStream(eventStream)
          exit(0)
        }
    
        signal(sig, SIG_IGN)
        signalSource.activate()
        sources.append(signalSource)
      }
    
      return sources
    }

    Finally, we call this function and hold a reference to the dispatch sources – if not, Swift will deallocate them as unused, and then our cleanup code won’t run.

    let cleanup = registerCleanup(eventStream)

Here’s the complete script:

watch_for_changes.swift
#!/usr/bin/env swift
/// Watch for changed files in a directory, and print a message when
/// something changes.
///
/// Example:
///
///     $ swift scripts/watch_for_changed_files.swift ~/Desktop/ ~/Documents/
///     Listening for changes in /Users/alexwlchan/Desktop/, /Users/alexwlchan/Documents/
///     Detected file change!
///     Detected file change!
///     Detected file change!
///

import Foundation

/// Create a new file system events stream that watches for changes
/// in the given directories.
func createFSEventStream(_ pathsToWatch: [String]) -> FSEventStreamRef {
  let callback: FSEventStreamCallback = { (_, _, _, _, _, _) in
    print("Detected file change!")
    
    // Flush stdout to ensure it's printed immediately
    fflush(stdout)
  }

  let context: UnsafeMutablePointer<FSEventStreamContext>? = nil
  let sinceWhen = FSEventStreamEventId(kFSEventStreamEventIdSinceNow)
  let latency = 0.01
  let flags = FSEventStreamCreateFlags()

  guard let eventStream = FSEventStreamCreate(
    kCFAllocatorDefault, callback, context, pathsToWatch as CFArray, sinceWhen, latency, flags
  ) else {
    fatalError("Failed to create FSEventStream: check your paths or permissions.")
  }

  return eventStream
}

/// Stop a file system events stream, invalidate it, and release our
/// reference to it.
func cleanupEventStream(_ eventStream: FSEventStreamRef) {
  FSEventStreamStop(eventStream)
  FSEventStreamInvalidate(eventStream)
  FSEventStreamRelease(eventStream)
}

/// Register cleanup handlers for SIGINT, SIGTERM and SIGHUP that
/// clean up the event stream when the script exits.
///
/// Returns an array of `DispatchSourceSignal`; the caller must hold
/// a reference to these in a global variable, or they will be cancelled.
func registerCleanup(_ eventStream: FSEventStreamRef) -> [DispatchSourceSignal] {
  let signals = [SIGINT, SIGTERM, SIGHUP]
  var sources: [DispatchSourceSignal] = []

  for sig in signals {
    let signalSource = DispatchSource.makeSignalSource(signal: sig, queue: .main)
    signalSource.setEventHandler {
      print("\nStopping listener...")
      cleanupEventStream(eventStream)
      exit(0)
    }

    signal(sig, SIG_IGN)
    signalSource.activate()
    sources.append(signalSource)
  }

  return sources
}

// Choose which folders to watch.
let home = URL(fileURLWithPath: NSHomeDirectory())
let pathsToWatch = [
  home.appendingPathComponent("Desktop").path,
  home.appendingPathComponent("Documents").path
]

// Create the event stream.
let eventStream = createFSEventStream(pathsToWatch)

// Register cleanup handlers that will run when the script exits.
let cleanup = registerCleanup(eventStream)

// Schedule the event stream and start listening for changes.
let queue = DispatchQueue(label: "net.alexwlchan.watch_file_changes")
FSEventStreamSetDispatchQueue(eventStream, queue)
FSEventStreamStart(eventStream)

print("Listening for changes in \(pathsToWatch.joined(separator: ", "))")

dispatchMain()

When you run this script, you should see it print Detected file change! every time you change a file on your Desktop. Stop the script with ^C.

$ swift watch_for_changes.swift
Listening for changes in /Users/alexwlchan/Desktop, /Users/alexwlchan/Documents
Detected file change!
Detected file change!
Detected file change!
^C
Stopping listener...

This alone is enough to know I should kick off a site rebuild, but a full rebuild takes 10–15s. If I know which file had changed, I can do an incremental rebuild that would be much faster. Let’s tackle that next.

Knowing which files/folders had changes

If we want to know which file changed, and not merely that a file changed, we need to customise the FSEventStreamCallback. This callback takes six parameters, and the fourth parameter eventPaths is an array of paths where changes occurred.

The type is a bit gnarly: by default it’s a raw C array of raw C strings, or we can set the kFSEventStreamCreateFlagUseCFTypes flag to get a CFArrayRef of CFStringRef objects. (Here CF stands for Core Foundation, one of Apple’s low-level frameworks.) I started by setting theflag, and writing a function to converts the CFArrayRef into a vanilla Swift array:

let flags = FSEventStreamCreateFlags(kFSEventStreamCreateFlagUseCFTypes)

/// Convert a raw pointer from an FSEvent callback into a Swift String.
///
/// FSEventStream must be created with 'kFSEventStreamCreateFlagUseCFTypes'
func convertFSEventPaths(_ eventPaths: UnsafeRawPointer) -> [String] {
  let cfArray = Unmanaged<CFArray>.fromOpaque(eventPaths)
  return cfArray.takeUnretainedValue() as! [String]
}

I can imagine that if you’re working in a very performance-sensitive application, you might skip this step and operate on the C types directly, but that’s not necessary for me.

Then I modified the callback to parse the event paths, and print them one-by-one:

let callback: FSEventStreamCallback = { (_, _, _, eventPaths, _, _) in
  for p in convertFSEventPaths(eventPaths) {
    print("Detected change in \(p)")
  }
  
  // Flush stdout to ensure it's printed immediately
  fflush(stdout)
}

If we add these modifications to our script, it now prints the folder in which a change occurred – for example, if I edit a file /Users/alexwlchan/books/Reading List.txt, it prints the parent folder /Users/alexwlchan/Desktop/books.

$ swift watch_for_changed_folders.swift
Listening for changes in /Users/alexwlchan/Desktop, /Users/alexwlchan/Documents
Detected change in /Users/alexwlchan/Desktop/
Detected change in /Users/alexwlchan/Desktop/books/
Detected change in /Users/alexwlchan/Documents/My notes/
^C
Stopping listener...

One thing I noticed is that a single operation can sometimes emit multiple filesystem events – for example, if I save a file in my text editor, that emits two events. I’m guessing that’s one event to write the contents of the file, one event to update the metadata, but I’m not sure.

Because I don’t need fine-grained resolution of filesystem events, I use a Set(…) to de-duplicate events:

let callback: FSEventStreamCallback = { (_, _, _, eventPaths, _, _) in
  for p in Set(convertFSEventPaths(eventPaths)) {
    print("Detected change in \(p)")
  }
  
  // Flush stdout to ensure it's printed immediately
  fflush(stdout)
}

If we’re interested in the individual files rather than the directories, we can use the kFSEventStreamCreateFlagFileEvents flag:

let flags = FSEventStreamCreateFlags(
  kFSEventStreamCreateFlagFileEvents | kFSEventStreamCreateFlagUseCFTypes
)

This shows us every file which is changing, and often reveals clues about how our computers work under the hood – for example, when I took a screenshot, we can see it got saved to a hidden file first (.Screenshot), before being moved into its final location.

$ swift watch_for_changed_files.swift
Listening for changes in /Users/alexwlchan/Desktop, /Users/alexwlchan/Documents
Detected change in /Users/alexwlchan/Desktop/greeting.txt
Detected change in /Users/alexwlchan/Desktop/greeting.txt
Detected change in /Users/alexwlchan/Desktop/.Screenshot 2026-05-09 at 10.45.42.png
Detected change in /Users/alexwlchan/Desktop/Screenshot 2026-05-09 at 10.45.42.png
Detected change in /Users/alexwlchan/Desktop/.DS_Store

Here’s the complete script:

watch_for_changed_files.swift
#!/usr/bin/env swift
/// Watch for changed files in a directory, and print the paths of
/// changed files.
///
/// Example:
///
///     $ swift scripts/watch_for_changed_files.swift ~/Desktop/ ~/Documents/
///     Listening for changes in /Users/alexwlchan/Desktop/, /Users/alexwlchan/Documents/
///     Detected change in /Users/alexwlchan/Desktop/greeting.txt
///     Detected change in /Users/alexwlchan/Desktop/booktracker/index.html
///     Detected change in /Users/alexwlchan/Documents/proposal.pdf
///

import Foundation

/// Convert a raw pointer from an FSEvent callback into a Swift String.
///
/// FSEventStream must be created with 'kFSEventStreamCreateFlagUseCFTypes'
func convertFSEventPaths(_ eventPaths: UnsafeRawPointer) -> [String] {
  let cfArray = Unmanaged<CFArray>.fromOpaque(eventPaths)
  return cfArray.takeUnretainedValue() as! [String]
}

/// Create a new file system events stream that watches for changes
/// in the given directories.
func createFSEventStream(_ pathsToWatch: [String]) -> FSEventStreamRef {
  let callback: FSEventStreamCallback = { (_, _, _, eventPaths, _, _) in
    for p in Set(convertFSEventPaths(eventPaths)) {
      print("Detected change in \(p)")
    }
    
    fflush(stdout)
  }

  let context: UnsafeMutablePointer<FSEventStreamContext>? = nil
  let sinceWhen = FSEventStreamEventId(kFSEventStreamEventIdSinceNow)
  let latency = 0.01
  let flags = FSEventStreamCreateFlags(
    kFSEventStreamCreateFlagFileEvents | kFSEventStreamCreateFlagUseCFTypes
  )

  guard let eventStream = FSEventStreamCreate(
    kCFAllocatorDefault, callback, context, pathsToWatch as CFArray, sinceWhen, latency, flags
  ) else {
    fatalError("Failed to create FSEventStream: check your paths or permissions.")
  }

  return eventStream
}

/// Stop a file system events stream, invalidate it, and release our
/// reference to it.
func cleanupEventStream(_ eventStream: FSEventStreamRef) {
  FSEventStreamStop(eventStream)
  FSEventStreamInvalidate(eventStream)
  FSEventStreamRelease(eventStream)
}

/// Register cleanup handlers for SIGINT, SIGTERM and SIGHUP that
/// clean up the event stream when the script exits.
///
/// Returns an array of `DispatchSourceSignal`; the caller must hold
/// a reference to these in a global variable, or they will be cancelled.
func registerCleanup(_ eventStream: FSEventStreamRef) -> [DispatchSourceSignal] {
  let signals = [SIGINT, SIGTERM, SIGHUP]
  var sources: [DispatchSourceSignal] = []

  for sig in signals {
    let signalSource = DispatchSource.makeSignalSource(signal: sig, queue: .main)
    signalSource.setEventHandler {
      print("\nStopping listener...")
      cleanupEventStream(eventStream)
      exit(0)
    }

    signal(sig, SIG_IGN)
    signalSource.activate()
    sources.append(signalSource)
  }

  return sources
}

// Choose which folders to watch.
let home = URL(fileURLWithPath: NSHomeDirectory())
let pathsToWatch = [
  home.appendingPathComponent("Desktop").path,
  home.appendingPathComponent("Documents").path
]

// Create the event stream.
let eventStream = createFSEventStream(pathsToWatch)

// Register cleanup handlers that will run when the script exits.
let cleanup = registerCleanup(eventStream)

// Schedule the event stream and start listening for changes.
let queue = DispatchQueue(label: "net.alexwlchan.watch_file_changes")
FSEventStreamSetDispatchQueue(eventStream, queue)
FSEventStreamStart(eventStream)

print("Listening for changes in \(pathsToWatch.joined(separator: ", "))")

dispatchMain()

This is great if I’m writing a Swift app – but my static site generator is written in Python, so I’d really like to know about these changes in Python. How can I pass this information to a Python script?

Connecting Swift to Python

Building a bridge with stdout and subprocess

I want my Python code to invoke the Swift script as a new process, specify what directories it wants to watch, and read lines from stdout to see which files changed.

This means I have to change the Swift script in three ways:

  1. Allow passing a list of directories to watch as command-line arguments;
  2. Write the Listening for changes and Stopping listener messages to stderr;
  3. Change the Detected change message to print just the path of the changed file.

What’s nice is that there’s nothing Python-specific about this mechanism; you could use this to expose a stream of changed files in any language. (Although in practice I’ll only use it in Python, which I use for the majority of my recreational coding.)

I briefly considered trying to create some Python-Swift bridge, similar to what I did with clonefile() last year, but I couldn’t work out how to do it without bringing in more dependencies. Plus, it would have been a Python-only solution.

Here’s the updated Swift script:

watch_for_changed_files.swift [DIRS...]
#!/usr/bin/env swift
/// Watch for changed files in a directory, and print the paths of
/// changed files to stdout.
///
/// Example:
///
///     $ swift scripts/watch_for_changed_files.swift ~/Desktop/ ~/Documents/
///     Listening for changes in /Users/alexwlchan/Desktop/, /Users/alexwlchan/Documents/
///     /Users/alexwlchan/Desktop/greeting.txt
///     /Users/alexwlchan/Desktop/booktracker/index.html
///     /Users/alexwlchan/Documents/proposal.pdf
///

import Foundation

/// Convert a raw pointer from an FSEvent callback into a Swift String.
///
/// FSEventStream must be created with 'kFSEventStreamCreateFlagUseCFTypes'
func convertFSEventPaths(_ eventPaths: UnsafeRawPointer) -> [String] {
  let cfArray = Unmanaged<CFArray>.fromOpaque(eventPaths)
  return cfArray.takeUnretainedValue() as! [String]
}

/// Create a new file system events stream that watches for changes
/// in the given directories.
func createFSEventStream(_ pathsToWatch: [String]) -> FSEventStreamRef {
  let callback: FSEventStreamCallback = { (_, _, _, eventPaths, _, _) in
    for p in Set(convertFSEventPaths(eventPaths)) {
      print(p)
    }
    
    fflush(stdout)
  }

  let context: UnsafeMutablePointer<FSEventStreamContext>? = nil
  let sinceWhen = FSEventStreamEventId(kFSEventStreamEventIdSinceNow)
  let latency = 0.01
  let flags = FSEventStreamCreateFlags(
    kFSEventStreamCreateFlagFileEvents | kFSEventStreamCreateFlagUseCFTypes
  )

  guard let eventStream = FSEventStreamCreate(
    kCFAllocatorDefault, callback, context, pathsToWatch as CFArray, sinceWhen, latency, flags
  ) else {
    fatalError("Failed to create FSEventStream: check your paths or permissions.")
  }

  return eventStream
}

/// Stop a file system events stream, invalidate it, and release our
/// reference to it.
func cleanupEventStream(_ eventStream: FSEventStreamRef) {
  FSEventStreamStop(eventStream)
  FSEventStreamInvalidate(eventStream)
  FSEventStreamRelease(eventStream)
}

/// Register cleanup handlers for SIGINT, SIGTERM and SIGHUP that
/// clean up the event stream when the script exits.
///
/// Returns an array of `DispatchSourceSignal`; the caller must hold
/// a reference to these in a global variable, or they will be cancelled.
func registerCleanup(_ eventStream: FSEventStreamRef) -> [DispatchSourceSignal] {
  let signals = [SIGINT, SIGTERM, SIGHUP]
  var sources: [DispatchSourceSignal] = []

  for sig in signals {
    let signalSource = DispatchSource.makeSignalSource(signal: sig, queue: .main)
    signalSource.setEventHandler {
      fputs("\nStopping listener...\n", stderr)
      cleanupEventStream(eventStream)
      exit(0)
    }

    signal(sig, SIG_IGN)
    signalSource.activate()
    sources.append(signalSource)
  }

  return sources
}

// Choose which folders to watch.
let pathsToWatch: [String]
if CommandLine.arguments.count > 1 {
  pathsToWatch = Array(CommandLine.arguments.dropFirst())
} else {
  pathsToWatch = ["."]
}

// Create the event stream.
let eventStream = createFSEventStream(pathsToWatch)

// Register cleanup handlers that will run when the script exits.
let cleanup = registerCleanup(eventStream)

// Schedule the event stream and start listening for changes.
let queue = DispatchQueue(label: "net.alexwlchan.watch_file_changes")
FSEventStreamSetDispatchQueue(eventStream, queue)
FSEventStreamStart(eventStream)

fputs("Listening for changes in \(pathsToWatch.joined(separator: ", "))\n", stderr)

dispatchMain()

And here’s the first Python function I came up with:

from collections.abc import Iterator
from pathlib import Path
import subprocess


def watch_for_changed_files(*dirs: str | Path) -> Iterator[Path]:
    """
    Watch one or more directory trees for changes, and yield the paths of
    files when they change.
    """
    cmd = ["swift", "watch_for_changed_files.swift"] + [str(d) for d in dirs]

    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, bufsize=1)

    try:
        for line in proc.stdout:
            yield Path(line.strip())
    finally:
        proc.terminate()
        proc.wait()


for p in watch_for_changed_files("/Users/alexwlchan/Desktop"):
    print(p)

This function uses the subprocess module to start my Swift script in a new process, then it reads lines from proc.stdout and yields them to the caller. The caller gets a stream of changed file paths, and doesn’t need to worry about the underlying process.

The function will keep iterating over proc.stdout while stdout stays open, which lasts as long as the Swift process is running. It’s a long-running listener that only stops when I break out of the loop (whether with an explicit break, an exception, or stopping the whole Python script).

The text=True parameter means stdout will be opened in text mode rather than binary mode, and bufsize=1 means the output will be line-buffered, so proc.stdout will be flushed every time the Swift script writes a newline. (This pairs with fflush(stdout) in the Swift script to ensure there’s no buffering delay when I get a filesystem event.)

The try … finally construction ensures the process is stopped and cleaned up correctly when I’m done.

In my live reload script, I can now do an incremental rebuild that only rebuilds parts of the site that have changed. If I’ve changed the base template? Rebuild the entire site. If I’ve edited an article? Only one page needs to change. This is more efficient and makes rebuilds much faster.

Debouncing with non-blocking I/O and selectors

One problem with this function is that it doesn’t do debouncing. If I change a lot of files at once – say, a bulk find and replace – this function will emit every file separately, kicking off a bunch of redundant rebuilds. If I change ten files at once, I only need to do one rebuild, not ten.

What I’d like to do is coalesce all the changes that have happened since the last rebuild into a single event, then use them to inform the next rebuild. You can do some of this coalescing in Swift by tweaking the latency parameter, but that doesn’t work here because the latency is variable. The length of a rebuild can vary from a hundred milliseconds to multiple seconds, depending on how much of the site is being rebuilt.

What I’d like to do is read everything that’s available in proc.stdout, emit that to the caller, then wait for something else to be written. By default, reading from proc.stdout is a blocking operation – if we call read() and there’s nothing available, it waits until there’s something for us to read. To debounce, we’ll need to change this behaviour.

First, we change proc.stdout to be non-blocking:

import os

os.set_blocking(proc.stdout.fileno(), False)

Then, we need to know when anything has been written to stdout, so we can read all the available output and emit it to the caller. We could poll proc.stdout repeatedly and look for non-empty output, but that would be very inefficient – a better approach would be to use the selectors module and get notified when something gets written.

We create a selector that waits until there’s data waiting to be read from proc.stdout:

import selectors

sel = selectors.DefaultSelector()
sel.register(proc.stdout, selectors.EVENT_READ)

Then, we call the select() method, which blocks until an event is ready. At that point, we read everything that’s available proc.stdout and deliver it as a single changeset to the caller. For some use cases you might want to capture and inspect the event, but here it’s enough to know that an event was emitted, and start reading stdout:

while True:
    sel.select()
    
    captured_paths = set()
  
    while True:
        line = proc.stdout.readline()
        if not line:
            break
        captured_paths.add(Path(line.strip()))
  
    yield captured_paths

This changes the signature of the overall function, because now we’re emitting changesets instead of single files. Here’s the updated function:

from collections.abc import Iterator
import os
from pathlib import Path
import selectors
import subprocess


def watch_for_changed_files(*dirs: str | Path) -> Iterator[set[Path]]:
    """
    Watch one or more directory trees for changes, and yield the paths of
    files when they change.
    """
    cmd = ["swift", "scripts/watch_for_changed_files.swift"] + [str(d) for d in dirs]

    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, bufsize=1)
    
    os.set_blocking(proc.stdout.fileno(), False)
    
    sel = selectors.DefaultSelector()
    sel.register(proc.stdout, selectors.EVENT_READ)

    try:
        while True:
            sel.select()
    
            captured_paths = set()
    
            while True:
                line = proc.stdout.readline()
                if not line:
                    break
                captured_paths.add(Path(line.strip()))
    
            yield captured_paths
    finally:
        proc.terminate()
        proc.wait()

The result

Here’s a diagram which illustrates the code we’ve written: the FSEvents API emits an event to our Swift script, that prints the file paths to stdout, where they get read by a Python script that kicks off a site rebuild. (Click for a larger version.)

Sequence diagram showing the movement of messages between ‘Filesystem events’, ‘Swift script’, ‘stdout (pipe)’ and ‘Python script’. Python scriptstdout (pipe)Swift scriptFilesystem eventsPython scriptstdout (pipe)Swift scriptFilesystem eventssel.select() (waiting)sel.select() unblocksYield changed filesStarting site rebuild...Events accumulating instdout while site buildingSite rebuild completeLoop back to select()Yield changed filesFile modifiedWrite path to stdout and fflush()Signal EVENT_READreadline()File modifiedFile modifiedWrite path to stdoutWrite path to stdoutSignal EVENT_READ (data in stdout (pipe))readline() (drain all paths)

In informal benchmarking, there’s about 2–4 milliseconds between the on-disk modified time of a file and it being picked up by this function. Given file changes only occur when I do something, this is plenty fast enough. (I could click the “save” button as fast as I could, and the code would still have time for a long nap between consecutive clicks.)

Both the Swift and the Python code pause until something interesting happens, so this is very efficient – no aggressive polling that could hurt my battery life or SSD longevity.

Closing thoughts

Before I started this script, the only way I knew how to track file changes was by polling, which is undesirable for a number of reasons. I wasn’t sure if I could write an alternative, but now it’s done, I’m proud of the result.

I learnt a lot about topics I only vaguely understood before, including the macOS FSEvents API, how blocking and non-blocking I/O works in Python, and using the selectors module. Explaining it all for this article has cemented that learning, and I understand every line of this code.

I’m pleased I can do this without adding third-party dependencies, especially for something as low-level as filesystem access. Even if I eventually replace this code with a library, I’ll have a better mental model of how it works.

I’m surprised by how much this has improved my workflow. I was waiting 5 to 10 seconds with Jekyll; now, my browser reloads almost instantly with new changes. Everything feels a lot smoother, and it’s renewed my interest in working on the site.

In my next post, I’ll explain how I combine this watcher with HTTP long polling to trigger an automatic browser refresh the moment the rebuild finishes.

[If the formatting of this post looks odd in your feed reader, visit the original article]

Using Playwright to test my static sites

2026-05-02 14:08:53

I build a lot of static websites – including this site and all of my local media archives – and I want to test them. Most of my pages are static HTML and I can write automated tests that analyse the HTML, but for more complex sites I have JavaScript that runs in the browser and modifies the page. The only way to test that functionality is to open the page in a browser, click around, and see what happens. I could do that manually, but it quickly gets tedious.

To automate this process, I’ve been using a testing framework called Playwright, which is designed for this sort of end-to-end testing. It’s a tool that allows you to programatically control a web browser, look at the contents of a page, and make assertions about what’s there. Playwright can be used to test or script any kind of web app; I’m using it for static sites because those are the only web apps I have.

Playwright is available as a CLI, or there are libraries to use it with TypeScript, Python, .NET, and Java. All my other tests are written in Python, so that’s what I’m using.

Writing a basic test with Playwright

To set up Playwright with Python, you install the playwright library using pip or uv, then install a web browser for Playwright to control. (You can’t use Playwright with the browser you use day-to-day; you need special binaries with control hooks.)

I use Safari as my main browser, and Safari is based on WebKit, so let’s install that:

$ uv pip install playwright
$ python3 -m playwright install webkit

Then we can start writing tests. Here’s a basic test in which Playwright launches WebKit, opens example.com, and checks the text Example domain is visible on the page:

from playwright.sync_api import expect, sync_playwright


def test_basic_playwright() -> None:
    """
    Run a basic test with Playwright: load a web page and check it
    contains the expected text.
    """
    with sync_playwright() as p:
        browser = p.webkit.launch()

        page = browser.new_page()
        page.goto("https://example.com/")
        expect(page.get_by_text("Example domain")).to_be_visible()

        browser.close()

For a larger app, you might run your tests with multiple browsers to check compatibility – Playwright supports lots of other browsers, including Chromium, Firefox, and Mobile Safari in emulation. I’m just testing private sites where I’m the only user, so a single browser is fine.

This test passes in about half a second on my computer. That’s fine for a single test, but it would add up if I had lots of tests, each starting and stopping the browser every time. It would be nice to make that process faster, and to reduce some of the boilerplate as well.

A pair of Playwright fixtures

To reduce the repetition and reuse the browser instance, I have a couple of pytest fixtures to simplify things.

The first is a session-scoped fixture that starts the browser at the start of the test run, and closes it when I’m done:

from collections.abc import Iterator

from playwright.sync_api import Browser, sync_playwright
import pytest


@pytest.fixture(scope="session")
def browser() -> Iterator[Browser]:
    """
    Launch an instance of WebKit to interact with in tests.
    """
    with sync_playwright() as p:
        webkit = p.webkit.launch()
        yield webkit
        webkit.close()

Because this is a session-scoped fixture, it only runs once per test suite – that means the browser is only started once, then the same instance is reused for all the tests. This makes a large test suite significantly faster.

My other fixture is a bit more complicated – it gives you a page to interact with, and at the end of the test it checks the page didn’t have any warnings or errors. This is a strict approach, which helps me spot errors in areas I wasn’t explicitly testing. Here’s the fixture:

from collections.abc import Iterator

from playwright.sync_api import Browser, Page
import pytest


@pytest.fixture(scope="function")
def page(browser: Browser) -> Iterator[Page]:
    """
    Open a new page in the browser.
    
    If there are any errors or warnings when loading the page, the test
    will fail when this fixture is cleaned up.
    """
    p = browser.new_page()

    # Capture anything that gets logged to the console.
    console_messages = []
    p.on("console", lambda msg: console_messages.append(msg))

    # Capture any page errors
    page_errors = []
    p.on("pageerror", lambda err: page_errors.append(err))

    yield p

    # Check there weren't any console errors logged to the page.
    console_errors = [
        msg.text
        for msg in console_messages
        if msg.type == "error" or msg.type == "warning"
    ]
    assert console_errors == []

    # Check there weren't any page errors
    assert page_errors == []

These two fixtures allow for tighter, faster tests, focusing on what the test is actually checking. Here’s the example test, rewritten to use this fixture:

def test_playwright_with_fixture(page: Page) -> None:
    """
    Run a test using my Playwright fixture: load a web page, check it
    contains the expected test, and check it loads without errors.
    """
    page.goto("https://example.com/")
    expect(page.get_by_text("Example domain")).to_be_visible()

I use the page fixture for most tests, where I want to spot any unexpected errors or warnings. If I’m testing error handling specifically, I use the browser fixture and create a new page which isn’t treated as strictly.

Getting file:/// URIs for Playwright

Normally Playwright is used with http: and https: URLs, but my static websites are stored as HTML files on my local disk, and I often open them with file: URLs.

I could spin up a web server in my tests, but that’s extra overhead and might affect the results – there are subtle differences between how browsers handle pages opened with file: vs http:.

To convert file paths to file: URLs, I use the pathname2url function from the urllib.request module. I combine this with os.path.abspath to get a full URL I can pass to Playwright:

>>> from os.path import abspath
>>> from urllib.request import pathname2url
>>> path = "index.html"
>>> pathname2url(abspath(path), add_scheme=True)
'file:///Users/alexwlchan/repos/alexwlchan.net/index.html'

Assertions in Playwright

Playwright has a different set of assertion helpers to regular Python tests, and it takes some getting used to – I still have to consult the documentation when I write new tests.

Here are examples of assertions I’ve written using Playwright:

  • Testing that a redirect is working:

    resp = page.goto("https://alexwlchan.net/projects/chives/files/doesnotexist.txt")
    
    assert resp is not None
    assert resp.status == 200
    assert resp.url == "https://alexwlchan.net/projects/chives/files/?missing=doesnotexist.txt"
  • Test that text does or does not appear on a page:

    from playwright.sync_api import expect
    
    page.goto("https://www.example.com")
    
    expect(page.get_by_text("Example Domain")).to_be_visible()
    expect(page.get_by_text("Alex Chan")).not_to_be_visible()

    or:

    assert "Example Domain" in page.content()
    assert "Alex Chan" not in page.content()
  • Locate an element with a CSS selector, and check it does or doesn’t appear on a page:

    page.goto("https://www.example.com")
    
    expect(page.locator("h1")).to_be_visible()
    expect(page.locator("h2.title")).not_to_be_visible()
  • Locate an element, and make assertions about its attributes:

    page.goto("https://www.example.com")
    
    href = page.locator("a").first.get_attribute("href")
    assert href == "https://iana.org/domains/example"
  • Locate an element, and make assertions about the text it contains:

    page.goto("https://www.example.com")
    
    assert page.locator("a").inner_text() == "Learn more"
  • Check that an element with particular inner text is visible on the page:

    page.goto("https://www.example.com/")
    
    expect(page.locator('//h1[text()="Example Domain"]')).to_be_visible()
  • Locate an element immediately following a different element. I’ve used this a couple of times when I have tables or definition lists with a label in one element, and a value in another:

    dt_locator = page.locator('//dt[text()="Profile page:"]')
    next_dd = dt_locator.locator("xpath=following-sibling::*")
    
    assert (
        next_dd.inner_html().strip()
        == '<a href="https://www.flickr.com/photos/nasahqphoto/">NASA HQ PHOTO</a>'
    )
  • Check the number of matching elements on a page; for example, the length of a list:

    page.goto("https://alexwlchan.net/articles/")
    
    assert page.locator("#list_of_posts li").count() >= 10
  • Check the title of the page:

    page.goto("https://www.example.com/")
    
    assert page.title() == "Example Domain"
  • Check the behaviour of the page when JavaScript is disabled:

    context = browser.new_context(java_script_enabled=False)
    page = context.new_page()
    
    expect(page.locator("noscript .error")).to_be_visible()
    
    noscript_elem = page.locator("noscript .error")
    assert noscript_elem.inner_text() == "You must enable JavaScript to use this page."

This is just a fraction of what Playwright can do; it can be used to build far more complicated tests that walk through a web app and test multi-step user flows. I’m only using it to make assertions about snippets of JavaScript, but it’s still useful.

For a long time, I told myself that my static sites were simple enough not to need testing, but that didn’t prevent bugs from slipping in, and it limited what I could build. Now I can write proper tests for my sites, I can be more confident I haven’t broken anything, I can experiment faster, and I can try more ambitious ideas.

[If the formatting of this post looks odd in your feed reader, visit the original article]

Building a basic cache with SQLite

2026-04-29 16:24:49

This website is a static website, built with a static site generator I wrote myself. When I’m working on the site locally, I want it to build quickly. The site is relatively small, modern computers are overflowing with power, so I don’t want to be waiting. Rendering all the HTML pages takes about 15 seconds – slow enough that I feel the delay every time.

When I was using Jekyll, everything got much faster when I used the Jekyll cache. There’s a bunch of expensive computation that doesn’t need to be repeated every time I build the site – for example, converting a chunk of Markdown to HTML can be done once and cached forever.

Since I’m no longer using Jekyll, I’ve replaced the Jekyll cache with a basic SQLite cache. I chose SQLite because it’s fast, familiar, and I can use it with the Python standard library.

Every cache entry has three parts: a namespace, key, and value. The namespace groups all entries from a single operation, the key identifies an individual entry, and the value is the output of the expensive computation. For example, in my Markdown-to-HTML cache, the namespace is convert_markdown, the key is the input Markdown, and the value is the output HTML.

Currently I just store basic string values. I could store structured data as JSON or something, but I haven’t needed to yet.

My cache implementation is written in Python, but it’s just a thin wrapper around SQLite queries.

SQLite queries

To create an empty cache:

CREATE TABLE IF NOT EXISTS cache_entries(
  namespace, key, value, date_saved, 
  PRIMARY KEY (namespace, key)
)

This creates an empty table called cache_entries with four columns: the namespace/key/value described previously, and a date_saved column for debugging. I thought it would be useful to record when I saved a cache entry, but I haven’t needed it yet.

The composite PRIMARY KEY ensures I only have one cache entry for a given namespace/key pair.

To store a cache entry, I use a standard SQL INSERT OR REPLACE:

INSERT OR REPLACE INTO cache_entries VALUES (?,?,?,?);

To retrieve a cache entry, I use a standard SELECT:

SELECT value FROM cache_entries WHERE namespace=? AND key=?;

One thing I discovered is that this query can be noticeably slow if the cache value is large, because SQLite has to read many pages to retrieve the value. In some cases I just want to know if a value is cached, not what it actually is – the mere presence of the cache entry allows me to skip some work.

I have another query to detect if the cache has a matching entry, which is much faster because it skips reading the value:

SELECT EXISTS(SELECT 1 FROM cache_entries WHERE namespace=? AND key=?)

Finally, I have a couple of queries to purge the cache – either an individual entry, or for an entire operation:

DELETE FROM cache_entries WHERE namespace=? AND key LIKE ?;
DELETE FROM cache_entries WHERE namespace=?;

Choosing cache keys

For small inputs, I use the input as the cache key.

For large inputs (like the Markdown for an entire blog post), I use the MD5 hash as the key rather than the raw input. That reduces the amount of data written to disk, and should make the database faster. SQLite uses 4KB pages, which is smaller than many of my blog posts. You can store lots of MD5 hashes in a 4KB page, whereas a raw blog post would span multiple pages. That logic is handled outside the caching code.

When the result depends on an external file (like rendering a template), I include the last modified time of the external file in the cache key. When the external file changes, I get a cache miss and recompute the result.

How it’s going

If you’re interested, my cache implementation is public, as are the tests.

The cache has taken some fine-tuning. Cache invalidation is famously difficult, and there are definitely times when I’m not invalidating the cache properly. When I build the live version of the site, I delete the existing cache and start fresh to avoid stale cache entries.

For local development, this has been a big win. Re-rendering all the HTML pages used to take about 15 seconds, but with a warm cache it takes 0.06 seconds. That’s a 200× speedup that I feel every time I hit save, and it’s made working on this site a smoother and more satisfying experience.

[If the formatting of this post looks odd in your feed reader, visit the original article]

HTTP GET requests with the Python standard library

2026-04-24 22:14:03

If you’re doing HTTP in Python, you’re probably using one of three popular libraries: requests, httpx, or urllib3; I’ve used each of them at different times. These libraries are installed with pip, live outside the standard library, and provide more features than the built-in urllib.request module – indeed, the documentation for that module recommends using requests.

Recently I’ve been looking for a new HTTP library, because my previous choice seems abandoned. I was using httpx, but the maintainer has closed issues on the GitHub repo, there’s only been one commit since January, and the last release was over a year ago. The easy choice would be switching to requests or urllib3, but I wondered: can I just use the standard library?

My usage is pretty basic – I have some manually-invoked scripts that make a handful of GET requests to public websites. I don’t have long-running processes; I’m not making thousands of requests at once; I’m not using proxies or authentication. There are plenty of features you can only get from third-party HTTP libraries – from connection pooling to HTTP/2 support – but I don’t need any of them.

I started experimenting, and what I realised is that I don’t miss the features, but I do miss the API.

Here’s how you make a basic GET request with httpx:

import httpx

resp = httpx.get(
    "https://example.com",
    params={"name": "pentagon", "sides": "5"},
    headers={"User-Agent": "Shape-Sorter/1.0"}
)
print(resp.content)

Here’s the same request with urllib.request:

import urllib.parse
import urllib.request

url = "https://example.com"
params = {"name": "pentagon", "sides": "5"}
headers = {"User-Agent": "Shape-Sorter/1.0"}

u = urllib.parse.urlsplit(url)
query = urllib.parse.urlencode(params)
url = urllib.parse.urlunsplit(
    (u.scheme, u.netloc, u.path, query, u.fragment)
)

req = urllib.request.Request(url, headers=headers)

resp = urllib.request.urlopen(req)
print(resp.read())

Verbose! I’ve wrapped it in a helper function in chives, my personal utility library. Here’s the same request a third time:

from chives.fetch import fetch_url

resp = fetch_url(
    "https://example.com",
    params={"name": "pentagon", "sides": "5"},
    headers={"User-Agent": "Shape-Sorter/1.0"}
)
print(resp)

Much cleaner!

The code in chives does have one dependency – certfi, a lightweight package that provides Mozilla’s collection of root certificates.

There are lots of good reasons to use a third-party HTTP library, but I can do everything I need with the standard library and my personal wrapper. Let’s go through how it works.

Building the urllib.request.Request object

The first step is building the Request object. Other HTTP libraries provide helper functions or hide this step for simple requests (notice the basic httpx.get call doesn’t mention an httpx.Request), but for urllib.request we have to do it ourselves. Here’s mine:

import urllib.parse
import urllib.request


QueryParams = dict[str, str] | list[tuple[str, str]]
Headers = dict[str, str]


def build_request(
    url: str,
    *,
    params: QueryParams | None = None,
    headers: Headers | None = None
) -> urllib.request.Request:
    """
    Build a urllib Request, appending query parameters and attaching headers.
    """
    if params is not None:
        params_list = list(params.items()) if isinstance(params, dict) else params

        u = urllib.parse.urlsplit(url)
        query = urllib.parse.parse_qsl(u.query) + params_list
        new_query = urllib.parse.urlencode(query)
        url = urllib.parse.urlunsplit(
            (u.scheme, u.netloc, u.path, new_query, u.fragment)
        )

    req = urllib.request.Request(url, headers=headers or {})

    return req

I can pass params as a dict or as a list of (key, value) tuples; I start by converting it to the list form. This means I can pass the same query parameter multiple times in a URL. That’s admittedly unusual, but I use it on a couple of my websites so I wanted to support it here.

I’m using the urllib.parse module to manipulate the URL and append the query parameters. I parse the initial URL with urlsplit, encode the query parameters, then reassemble the URL with urlunsplit. This preserves any existing query parameters and fragments, and returns a complete URL I can pass to the Request object.

(If, like me, you’d reach for the urlparse function, you’re showing your age – one thing I learnt during this project is that urlparse is now obsolete, and urlsplit is the replacement.)

This function only handles GET requests, which is all I need for my scripts – but it wouldn’t be difficult to extend it to handle POST requests or form data if the need arises.

This is a pure function, so it’s easy to test thoroughly.

Getting a web page or an API endpoint

In most cases, I just care about getting the response body from the remote server, not the headers or URL – for example, if I’m fetching a web page or an API endpoint. If I want something different in a single script, I’ll eschew my wrapper and use urllib.request directly.

Here’s my fetch_url wrapper:

import certifi
import ssl


def fetch_url(
    url: str,
    *,
    params: QueryParams | None = None,
    headers: Headers | None = None
) -> bytes:
    """
    Fetch the contents of a URL and return the body of the response.
    """
    req = build_request(url, params=params, headers=headers)
    
    ssl_context = ssl.create_default_context(cafile=certifi.where())

    with urllib.request.urlopen(req, context=ssl_context) as resp:
        data: bytes = resp.read()

    return data

The key function is urllib.request.urlopen, which is what actually makes the HTTP request. I’m passing it two parameters: a Request and an SSLContext.

We build the Request using the build_request function.

The SSLContext tells urllib.request which HTTPS certificates it can trust, in this case by pointing to a “cafile” (Certificate Authority file) file provided by the certifi library. This file contains a list of trusted root certificates, and all valid HTTPS certificates should eventually point back to an entry in this list.

The certifi library is a lightweight wrapper around Mozilla’s list of trusted Root Certificates. It’s not in the standard library because it’s important to stay up to date with changes to the list, and you don’t want those changes coupled to Python version releases. Although this exercise is about reducing dependencies, I’m okay with certifi because it’s tiny – you can read the whole thing in less than five minutes. I know what it’s doing.

The urlopen function looks for a 200 OK status code, and throws an HTTPError if it gets an error response from the server. I considered wrapping that in another type, but for now I’m just catching HTTPError.

This function doesn’t set a timeout on HTTP requests. That would be an issue in a lot of contexts, but I’m normally using this from a script I run manually. If something gets stuck, I can stop the script and debug manually.

This function doesn’t support streaming responses; it reads the whole thing into memory at once. That’s fine for web pages or API calls, but I wouldn’t use this to download large files or videos.

There’s a lot of stuff this function doesn’t do, but it works well in all of my scripts, it has a friendly API, and it only has one third-party dependency.

Downloading images with format-based file extensions

As I started using the fetch_url in my projects, I realised the one time I often care about response headers is when I’m downloading images. I want the filename to have the appropriate filename extension – .jpg for JPEGs, .png for PNGs, and so on. Sometimes I can guess the file format from the URL, but sometimes I need to inspect the Content-Type header.

I considered exposing the headers from fetch_url, but since I only need the headers for downloading images and that’s a pretty common operation, I decided to make a download_image helper instead.

First, I wrote a helper function that picks a filename extension based on the Content-Type header:

def choose_filename_extension(content_type: str | None) -> str:
    """
    Choose a filename extension for an image downloaded with the given
    Content-Type header.
    """
    if content_type is None:
        raise ValueError(
            "no Content-Type header, cannot determine image format"
        )

    content_type_mapping = {
        "image/jpeg": "jpg",
        "image/png": "png",
        "image/gif": "gif",
        "image/webp": "webp",
    }

    try:
        return content_type_mapping[content_type]
    except KeyError:
        raise ValueError(f"unrecognised Content-Type header: {content_type}")

The mapping contains the four image formats I encounter in practice; it’s easy for me to add more if I try to download a newer format someday.

Then I wrote a function that takes an image URL and an “out prefix” (an initial guess at the path), downloads the image and choose a new file extension, and returns the final path:

from pathlib import Path


def download_image(
    url: str,
    out_prefix: Path,
    *,
    params: QueryParams | None = None,
    headers: Headers | None = None,
) -> Path:
    """
    Download an image from the given URL to the target path, and return
    the path of the downloaded file.

    Add the appropriate file extension, based on the image's Content-Type.

    Throws a FileExistsError if you try to overwrite an existing file.
    """
    req = build_request(url, params=params, headers=headers)

    ssl_context = ssl.create_default_context(cafile=certifi.where())

    with urllib.request.urlopen(req, context=ssl_context) as resp:
        image_data: bytes = resp.read()

    image_format = choose_filename_extension(content_type=resp.headers["content-type"])

    out_path = out_prefix.with_suffix("." + image_format)
    out_path.parent.mkdir(exist_ok=True, parents=True)
    with open(out_path, "xb") as out_file:
        out_file.write(image_data)

    return out_path

The first half of this function is the same as fetch_url; the second half constructs the final path and writes the download image to disk. I like this approach because it allows the caller to specify a meaningful directory and filename without worrying about the filename extension (which is important but not meaningful).

The function creates the output directory if it doesn’t exist, for convenience. Nothing grinds my gears like getting a FileNotFoundError when trying to write to a file in a folder that doesn’t exist. My text editor is smart enough to auto-create missing folders; I want my code to do the same.

I open the file in xb mode to avoid overwriting existing files – if I try to write to an image I’ve already saved, I get a FileExistsError. I find that a useful safety check, and I use exclusive creation mode in a lot of my scripts now.

Packaging and testing

A few months ago, I created a personal utility library chives for dealing with tiny archives, and that was a good place to keep this code.

The HTTP code is in chives.fetch, and the accompanying tests are in test_fetch.py. I’m testing it using the vcrpy library, which knows how to record responses from urllib.request.

I now use this code across all my personal scripts, and it’s been rock-solid. There are lots of good reasons to use Python’s more advanced HTTP libraries, but they’re for use cases I don’t have.

[If the formatting of this post looks odd in your feed reader, visit the original article]

Auditing my local Python packages

2026-04-11 01:00:35

Is it just me, or are chain attacks on the rise? It feels like there are more and more incidents where a bad actor publishes a malicious version of a popular package, people install it on their machines, and they get compromised. In March alone, such attacks included Axios npm package, the Trivy vulnerability scanner, and the LiteLLM Python package.

So far I’ve been unaffected, because the attacks have only involved libraries or packages I don’t use – but it would be foolish to imagine that will always be the case. I have a lot of local Python projects, and I’ve been thinking about how I’d react if a Python package I use was compromised.

The first step is detection: once I know a package version is malicious, how do I know if I’ve installed it? Because I use virtual environments, this turns out to be a non-trivial question.

What are virtual environments?

Virtual environments (or “virtualenvs”) are a tool to create isolated Python environments, each with its own set of installed packages. They allow you to have different dependencies for different projects. For example, if two projects depend on different versions of the same package, you can create per-project virtualenvs, each with the appropriate version.

A virtualenv is stored in a folder that includes symlinks to the global Python interpreter and the packages you’ve installed in the virtualenv. When you “activate” the virtualenv, commands like pip install install packages in the virtualenv folder rather than your global Python.

Here’s an example:

$ # `python3` points to my global interpreter
$ which python3
/Library/Frameworks/Python.framework/Versions/3.13/bin/python3

$ # Create the virtualenv
$ python3 -m venv .venv

$ # Activate the virtualenv, so now `python3` and `pip` commands will
$ # run inside the virtualenv
$ source .venv/bin/activate

$ # `python3` now points to the symlink in the virtualenv
$ which python3
/private/tmp/example/.venv/bin/python3

$ # Pillow will be installed inside the `.venv` folder
$ pip install Pillow

I create a new virtualenv for every Python project, so I have a lot of different virtualenvs on my personal Mac.

To check if I’d installed version X of package Y, I’d have to check each of my virtualenvs. Python itself doesn’t keep a running list of virtualenvs I’ve created, so I have to manage that list myself.

Getting a list of my virtualenvs

I’m very consistent about naming my virtualenvs: the folder is always named .venv. (I actually have a shell function for creating virtualenvs, which enforces that convention.)

This means I can find all the virtualenvs in my home directory with a one-line command:

$ find ~ -type d -name .venv
/Users/alexwlchan/repos/snippets/.venv
/Users/alexwlchan/repos/alexwlchan.net/.venv
/Users/alexwlchan/repos/colour-scheme/.venv

I can similarly search external drives and volumes where I have virtualenvs:

$ find /Volumes/Media/ -type d -name .venv
/Volumes/Media/Screenshots/.venv
/Volumes/Media/Social Media/.venv
/Volumes/Media/Bookmarks/.venv

These commands take about 30 seconds to run – just long enough to be annoying – so I’ve saved the results to a text file:

$ find ~ -type d -name .venv >> ~/.venv_registry
$ find /Volumes/Media/ -type d -name .venv >> ~/.venv_registry

I’ve also modified my shell function that creates virtualenvs to update this file whenever I create a new virtualenv. Now I have an up-to-date list of all my virtualenvs that I can use to search for vulnerable dependencies.

What about Python packages installed outside virtualenvs?

If you run pip install without activating a virtualenv, the packages will get installed in your global Python installation, and they wouldn’t be included in this list. This is generally a bad idea, because you’re back to the problem of different projects using incompatible dependencies.

You can tell pip that it should only use virtualenvs, either with an environment variable or a config file. Once you set up that config, pip will refuse to install packages outside a virtualenv.

Alternatively, if you use uv instead of pip, you can’t install packages outside a virtualenv unless you explicitly pass the --system flag to modify your system Python.

I set the PIP_REQUIRE_VIRTUALENV=true in my shell config file, and I use uv, so I don’t have any Python packages installed outside virtualenvs.

Searching my virtualenvs for package versions

Now I have a text file with a list of all my virtualenvs, I can write scripts that run commands in each of them.

For example, here’s a bash script that runs uv pip freeze in every virtualenv to print a list of installed dependencies:

#!/usr/bin/env bash

set -o errexit
set -o nounset

while read -r venv_dir; do
  if ! test -d "$venv_dir"; then
    echo "does not exist: $venv_dir" >&2
    continue
  fi
  
  echo "== $venv_dir =="
  uv pip freeze --python "$venv_dir/bin/python"
  echo ""
done < ~/.venv_registry

Within half a second, I have a complete list of every Python package installed in every virtualenv on my Mac. I dump the output to a text file, and then I can look for compromised package versions – or reassure myself that I don’t have a package installed, not even as an indirect dependency.

I skip missing virtualenvs because they’re probably temporary environments I have yet to clean up from my registry, or virtualenvs on external drives that are currently unmounted.

I like that this script doesn’t run the Python interpreter itself, so I won’t make things worse if I’ve already installed a malicious package. In particular, uv is a Rust tool that doesn’t run any Python code, it just knows how to understand Python installations.

For example, with the recent LiteLLM compromise, the attackers installed a .pth file which would run as soon as you started Python, even if you didn’t import LiteLLM. Even a basic python --version or pip freeze would compromise your machine. I could easily modify this script to look for the malicious .pth file in all of my Python environments, without ever running Python.

Other uses for a virtualenv registry

I originally wrote this to detect compromised packages, but I found other uses:

  • I can find outdated versions of packages, and make sure all my virtualenvs are up-to-date.

  • If I’m trying to stop using a package, I can find any places I’m stll using it and remove it. For example, I’m trying to replace some third-party HTTP libraries with the standard library, and these scripts help me find where I’m still using the third-party libraries.

  • I can search all my Python code for places where I use specific functions or features, in a more efficient way than grepping my entire disk. For example, I have a couple of personal utility libraries, and I can see which functions I’m still using and which can be deleted. I do this by searching the parent directory of each .venv path, which is the root of each project.

I hope none of the libraries I use are ever compromised, but if they are, I’ll be ready – and in the meantime, this is a useful tool to have around.

[If the formatting of this post looks odd in your feed reader, visit the original article]

Quietly quantum-resistant blogging

2026-04-09 16:28:09

Among the other fun news recently, two papers were published that suggest quantum computers capable of breaking classical public-key cryptography algorithms are much closer than previously believed. What was thought to be years away might now be months.

I found Filippo Valsorda’s post especially helpful in understanding the scale of the risk. We should assume that practical quantum computers are arriving imminently, and roll out quantum-resistant cryptography everywhere, lest we be caught unprepared and leave ourselves at risk.

Google have set a 2029 deadline for moving to quantum-resistant cryptography; Cloudflare have done similar. (Similar internal discussions are happening at my workplace but there aren’t any public announcements yet.)

Amidst all the concern, I was pleasantly surprised to discover that my website is already using quantum-resistant cryptography, and I didn’t even realise.

What’s the threat?

All “classical” public-key cryptography relies on hard mathematical problems – operations that are easy to compute in one direction, but incredibly difficult to do in reverse.

For example, it’s easy to multiply two prime numbers together and compute the result, but working out those two prime numbers if you only have the result is impossibly hard. Even for fairly small numbers, you could be working until the heat death of the universe and still not have an answer.

Quantum computers work differently to traditional computers, and a sufficiently powerful one can reverse these one-way computations. That would break all of our existing cryptography.

This is the cryptography that underpins almost everything we do online – protecting banks, governments, militaries, and pretty much everyone else. If somebody had a quantum computer that could crack it, all of that information would become readable to them. It would be disastrous.

Small-scale quantum computers already exist in labs, but nothing powerful enough to break public key cryptography – for now. Researchers have been trying to build bigger and better quantum computers, but they were a long way away from building anything this powerful. They’d likely get there eventually, but that was expected to be a long time away – late 2030s at the earliest.

Other researchers have been developing new cryptographic algorithms that rely on different maths problems, which can’t be easily broken by quantum computers. These new algorithms are the so-called “post-quantum cryptography (PQC)” or “quantum-resistant cryptography”. They’ve gradually been formalised as standards, and are starting to be used by our devices. For example, all the popular web browsers now support PQC for HTTPS certificates.

Previously, organisations like the NCSC or NIST recommended a 2035 deadline for migrating to PQC. The idea was to be fully migrated long before quantum computers became a practical threat. That recommendation wasn’t just an abundance of caution – it’s to eliminate the risk of Harvest Now, Decrypt Later (HNDL) attacks, where an adversary records data encrypted with classical cryptography, and waits until they have a quantum computer that can unlock it. The sooner we migrate to PQC, the more expensive and less valuable such an attack becomes.

Now, it appears we need more urgency.

The two recently published papers narrow the gap between the experimental machines we have today and a practical threat. They describe efficiency improvements that would allow quantum computers to reverse these mathematical operations with far less computing power. It’s become more plausible that somebody could build a “sufficiently powerful” machine within a few years. It’s also becoming a smarter bet to throw lots of money at building one right now, where previously the odds of success were so low as to make that an unwise bet.

This is why Google, Cloudflare, and others are moving forward their deadlines for migrating to post-quantum cryptography. The threat has gone from “late 2030s if we’re unlucky” to “early 2030s, maybe sooner”.

What about this blog?

While reading the recent news about this issue, I found Cloudflare’s post-quantum encryption radar, which tells you how many websites are protected using post-quantum cryptography. My website isn’t hosted on Cloudflare but I decided to try it anyway, and I was surprised by the result. I’m already protected!

A form to check if a host supports post-quantum TLS key exchange. I’ve entered my site ‘alexwlchan.net’ and Cloudflare reports that ‘alexwlchan.net:443 is using X25519MLKEM768, which is post-quantum secure’.

I never set up post-quantum cryptography for this site, but it’s enabled anyway, because I’m using Caddy as my web server, and Caddy’s default TLS settings include PQC support. At some point I updated to a new version of Caddy, I got these new defaults, and my site started quietly serving traffic with quantum-resistant cryptography.

This is exactly what I wanted when I switched to Caddy. I’m not an expert on cryptography, or TLS, or securing servers, so I wanted a web server that would make sensible decisions for me. I’ve mostly been ignorant of post-quantum cryptography and developments in quantum computing, but Caddy was protecting me anyway.

There’s a lot more work to do to use quantum-resistant cryptography everywhere, and recent announcements have made it far more urgent – but we can all sleep easier knowing my little blog is safe from quantum computers.

[If the formatting of this post looks odd in your feed reader, visit the original article]