2025-09-24 13:08:00
It has been a few months since I left Seeed Studio and started working on my own business, AnyShake Project, a open-source project targeted at Raspberry Shake and other similar products.
As part of building a complete seismograph solution — both hardware and software — I knew from the beginning that integration with SeisComP was a must. It's one of the most widely used tools in professional seismology for real-time monitoring, data acquisition, and event detection. However, most SeisComP plugins are traditionally written in either C++ or Python. And to be honest, after years of juggling build systems, header hell, and Python dependency management, I decided: not this time.
SeisComP's plugin system is designed with C, C++, and Python in mind. There's no official plugin SDK for Go — actually, no SDKs at all — and the documentation is painfully sparse when it comes to writing custom data sources.
To be honest, after getting used to modern developer tools and documentation with quick starts, examples, and real-world use cases, reading the SeisComP docs felt like stepping back in time. Instead of showing you how to get something working quickly, it starts with every possible concept, explained in abstract detail. I get that it's thorough, but when all I wanted was "how do I feed my waveform into the system?"—it was a slog.
I don’t need a philosophical discussion on the nature of a stream; I need a working data pipeline.
So I decided to do it the hard way: dive into the source code, trace how existing modules talk to each other, and build my own minimal setup from scratch.
Go (or Golang) has become my go-to language for building everything from firmware tools to backend services. It compiles to a single binary, it's cross-platform friendly, and concurrency is baked into the language. Most importantly, it allows me to move fast without sacrificing stability or maintainability. So the question became: can I write a fully functional SeisComP plugin in pure Go—no C bindings, no Python wrappers?
Spoiler alert: yes, I can.
Seismograph manufacturers are scattered across the globe, and most of them use their own proprietary protocols. To accommodate this diversity, SeisComP adopts an interface-oriented design pattern: it provides two files, plugin.c
and plugin.h
, allowing users to implement specific function signatures (such as send_raw3
) themselves. Once implemented, users can register their plugin within SeisComP, enabling dynamic loading of data sources without modifying the SeisComP core.
This is a particularly clever approach because it elegantly decouples third-party data ingestion from the core system. By examining the source code of plugin.c
, we can see that the final call chain is as follows:
send_raw3
↓
├─ Prepare and populate header fields (station, channel, time, correction...)
↓
├─ If no data is available:
│ ├─ Send a time packet (PluginRawDataTimePacket)
│ └─ Or send a GAP packet (PluginRawDataGapPacket)
↓
├─ Loop and send data in fragments:
│ ├─ head.packtype = TimePacket (first packet) / DataPacket (subsequent packets)
│ ├─ head.data_size = sample_count
│ └─ Call send_packet()
│ ↓
│ ├─ writen(PLUGIN_FD, head)
│ └─ writen(PLUGIN_FD, dataptr)
↓
└─ Return the total number of bytes sent
Here, PLUGIN_FD
is defined as 63
— a predefined constant in SeisComP’s SeedLink plugin system. It represents a special file descriptor used as an IPC "pipe" for communication between the plugin and SeisComP.
This insight reveals an important fact: if we can replicate this process, we can implement a fully functional SeisComP plugin in Go.
In the case of AnyShake Observer, the software already provides a TCP-based data forwarder. This means that all I need to do is connect to the server, receive and parse the incoming data, and then format and send it according to SeisComP’s expected protocol.
Initially, I considered calling the C functions directly via cgo, but ultimately, due to cross-compilation concerns, I decided to translate the prototypes into pure Go code—after all, they aren’t particularly complex. I created a struct that encapsulates the file descriptor and exposed the following methods:
They maintain the same function signatures as in C, but are implemented entirely in pure Go, requiring no cgo support and thus can be called directly from Go. The final code is as follows:
import (
"encoding/binary"
"os"
"time"
)
const (
PLUGIN_FD = 63
PLUGIN_MSEED_SIZE = 512
PLUGIN_MAX_MSG_SIZE = 448
PLUGIN_MAX_DATA_BYTES = 4000
)
const (
PLUGIN_RAW_TIME_PACKET = 8
PLUGIN_RAW_PACKET = 9
PLUGIN_RAW_GAP_PACKET = 10
PLUGIN_RAW_FLUSH_PACKET = 11
PLUGIN_LOG_PACKET = 12
PLUGIN_MSEED_PACKET = 13
)
type PluginPacketHeader struct {
PackType uint32
Station [10]byte
Channel [10]byte
Year uint32
Yday uint32
Hour uint32
Minute uint32
Second uint32
Usec uint32
UsecCorrection int32
TimingQuality int32
DataSize int32
}
type SeedLinkPluginIPC struct {
fd *os.File
}
func NewSeedlinkPluginIPC() SeedLinkPluginIPC {
return SeedLinkPluginIPC{
fd: os.NewFile(PLUGIN_FD, "seedlink"),
}
}
func (s *SeedLinkPluginIPC) sendPacket(head *PluginPacketHeader, data []byte) error {
headerBuf := make([]byte, 60)
binary.LittleEndian.PutUint32(headerBuf[0:4], head.PackType)
copy(headerBuf[4:14], head.Station[:])
copy(headerBuf[14:24], head.Channel[:])
binary.LittleEndian.PutUint32(headerBuf[24:28], head.Year)
binary.LittleEndian.PutUint32(headerBuf[28:32], head.Yday)
binary.LittleEndian.PutUint32(headerBuf[32:36], head.Hour)
binary.LittleEndian.PutUint32(headerBuf[36:40], head.Minute)
binary.LittleEndian.PutUint32(headerBuf[40:44], head.Second)
binary.LittleEndian.PutUint32(headerBuf[44:48], head.Usec)
binary.LittleEndian.PutUint32(headerBuf[48:52], uint32(head.UsecCorrection))
binary.LittleEndian.PutUint32(headerBuf[52:56], uint32(head.TimingQuality))
binary.LittleEndian.PutUint32(headerBuf[56:60], uint32(head.DataSize))
if _, err := s.fd.Write(headerBuf); err != nil {
return err
}
if data != nil {
if _, err := s.fd.Write(data); err != nil {
return err
}
}
return nil
}
func (s *SeedLinkPluginIPC) isLeap(y int) bool {
return (y%400 == 0) || (y%4 == 0 && y%100 != 0)
}
func (s *SeedLinkPluginIPC) ldoy(y, m int) int {
doy := [...]int{0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365}
if s.isLeap(y) && m >= 3 {
return doy[m-1] + 1
}
return doy[m-1]
}
func (s *SeedLinkPluginIPC) mdy2dy(month, day, year int) int {
return s.ldoy(year, month) + day - 1
}
func (s *SeedLinkPluginIPC) Close() {
_ = s.fd.Close()
}
func (s *SeedLinkPluginIPC) SendRaw3(station, channel string, t time.Time, usecCorr, timingQuality int, data []int32) error {
const maxSamplesPerPacket = PLUGIN_MAX_DATA_BYTES / 4 // 4000 / 4 = 1000
sent := 0
total := len(data)
first := true
for sent < total {
end := sent + maxSamplesPerPacket
if end > total {
end = total
}
chunk := data[sent:end]
var head PluginPacketHeader
copy(head.Station[:], station)
copy(head.Channel[:], channel)
if first {
head.PackType = PLUGIN_RAW_TIME_PACKET
head.Year = uint32(t.Year())
head.Yday = uint32(s.mdy2dy(int(t.Month()), t.Day(), t.Year()))
head.Hour = uint32(t.Hour())
head.Minute = uint32(t.Minute())
head.Second = uint32(t.Second())
head.Usec = uint32(t.Nanosecond() / 1000)
head.UsecCorrection = int32(usecCorr)
head.TimingQuality = int32(timingQuality)
first = false
} else {
head.PackType = PLUGIN_RAW_PACKET
}
head.DataSize = int32(len(chunk))
dataBytes := make([]byte, len(chunk)*4)
for i, v := range chunk {
binary.LittleEndian.PutUint32(dataBytes[i*4:(i+1)*4], uint32(v))
}
if err := s.sendPacket(&head, dataBytes); err != nil {
return err
}
sent = end
}
return nil
}
func (s *SeedLinkPluginIPC) SendFlush3(station, channel string) error {
var head PluginPacketHeader
copy(head.Station[:], station)
copy(head.Channel[:], channel)
head.PackType = PLUGIN_RAW_FLUSH_PACKET
head.DataSize = 0
return s.sendPacket(&head, nil)
}
func (s *SeedLinkPluginIPC) SendMSeed(station string, data []byte) error {
if len(data) != PLUGIN_MSEED_SIZE {
return nil
}
var head PluginPacketHeader
copy(head.Station[:], station)
head.PackType = PLUGIN_MSEED_PACKET
head.DataSize = int32(len(data))
return s.sendPacket(&head, data)
}
func (s *SeedLinkPluginIPC) SendMSeed2(station, channel string, seq int, data []byte) error {
if len(data) != PLUGIN_MSEED_SIZE {
return nil
}
var head PluginPacketHeader
copy(head.Station[:], station)
copy(head.Channel[:], channel)
head.PackType = PLUGIN_MSEED_PACKET
head.TimingQuality = int32(seq)
head.DataSize = int32(len(data))
return s.sendPacket(&head, data)
}
func (s *SeedLinkPluginIPC) SendLog3(station string, t time.Time, msg string) error {
var head PluginPacketHeader
copy(head.Station[:], station)
head.PackType = PLUGIN_LOG_PACKET
head.Year = uint32(t.Year())
head.Yday = uint32(s.mdy2dy(int(t.Month()), t.Day(), t.Year()))
head.Hour = uint32(t.Hour())
head.Minute = uint32(t.Minute())
head.Second = uint32(t.Second())
head.Usec = uint32(t.Nanosecond() / 1000)
data := []byte(msg)
head.DataSize = int32(len(data))
return s.sendPacket(&head, data)
}
func (s *SeedLinkPluginIPC) SendRawDepoch(station, channel string, depoch float64, usecCorr, timingQuality int, data []int32) error {
sec := int64(depoch)
usec := int((depoch - float64(sec)) * 1e6)
t := time.Unix(sec, int64(usec)*1000).UTC()
return s.SendRaw3(station, channel, t, usecCorr, timingQuality, data)
}
In the AnyShake Project, the AnyShake Explorer — used as a data acquisition device—can experience gradual clock drift due to tiny deviations in its crystal oscillator. To address this, I chose to use SendMSeed
to provide the data, ensuring that each data packet carries a fully controllable timestamp.
The function SendMSeed
only accepts pre-encoded MiniSEED packets, so I leveraged a pure Go MiniSEED encoding library I previously developed for the AnyShake Project, mseedio, to handle this task.
It’s important to note that, according to the implementation in plugin.c
, each MiniSEED packet written to SendMSeed
must have a length equal to PLUGIN_MSEED_SIZE
, i.e., 512 bytes. Through extensive testing, I found that MiniSEED packets containing 100 samples are safe. Therefore, I set 100 samples as the upper limit for a single MiniSEED data block. Before encoding, I check the data length and slice it appropriately. I also encapsulated the MiniSEED encoding logic to make it easier to use.
import (
"fmt"
"time"
"github.com/bclswl0827/mseedio"
)
const MINISEED_CHUNK_SAMPLES = 100
type MiniSeedData struct {
Station string
Network string
Location string
Channel string
Timestamp int64
SampleRate int
Data []int32
}
func NewMiniSeedData(timestamp time.Time, station, network, location, channel string, sampleRate int, data []int32) MiniSeedData {
return MiniSeedData{
Timestamp: timestamp.UnixMilli(),
Station: station,
Network: network,
Location: location,
Channel: channel,
SampleRate: sampleRate,
Data: data,
}
}
func (m *MiniSeedData) chunkInt32Slice(data []int32, chunkSamples int) [][]int32 {
var chunks [][]int32
for i := 0; i < len(data); i += chunkSamples {
end := min(i+chunkSamples, len(data))
chunks = append(chunks, data[i:end])
}
return chunks
}
func (m *MiniSeedData) EncodeChunk(sequenceNumber int) ([][]byte, error) {
dataSpanMs := 1000 / m.SampleRate
var buf [][]byte
for i, c := range m.chunkInt32Slice(m.Data, MINISEED_CHUNK_SAMPLES) {
var miniseed mseedio.MiniSeedData
if err := miniseed.Init(mseedio.STEIM2, mseedio.MSBFIRST); err != nil {
return nil, err
}
startTime := time.UnixMilli(m.Timestamp + int64(i*MINISEED_CHUNK_SAMPLES*dataSpanMs)).UTC()
if err := miniseed.Append(c, &mseedio.AppendOptions{
ChannelCode: m.Channel,
StationCode: m.Station,
LocationCode: m.Location,
NetworkCode: m.Network,
SampleRate: float64(m.SampleRate),
SequenceNumber: fmt.Sprintf("%06d", sequenceNumber),
StartTime: startTime,
}); err != nil {
return nil, err
}
for i := 0; i < len(miniseed.Series); i++ {
miniseed.Series[i].BlocketteSection.RecordLength = 9
}
msData, err := miniseed.Encode(mseedio.OVERWRITE, mseedio.MSBFIRST)
if err != nil {
return nil, err
}
buf = append(buf, msData)
}
return buf, nil
}
The above covers the core code analysis. For establishing the TCP connection, the main program, and some necessary template files, I have open-sourced everything in the AnyShake Nexus repository. The code is simple and easy to read, so I won’t go into further detail here.
I configured the AnyShake Nexus plugin as a data source in SeisComP.
After starting the SeedLink module and checking the logs, it is clear that the system correctly recognizes the data stream coming from the plugin.
Opening a terminal and running scrttv
shows the real-time waveform from the plugin—mission accomplished.
2025-01-21 09:31:22
In mid-October, after three rounds of assessments and interviews, I finally get a work from a company that specializing in open-source IoT and embedded development.
As an application engineer at this company, my responsibilities include building libraries, maintaining product documentation on the wiki, and creating customized demos for various edge computing devices. Just a few days into the role, I learned about a legacy issue with one of the products. Specifically, the RS485 interfaces had a critical flaw that prevented the port from properly switching the data direction.
The R&D team had previously attempted to address this issue at the application layer by developing a utility tool called rs485_DE
using C. This tool would open the original RS485 port (e.g., /dev/ttyAMA2
), then create a new virtual serial port, and monitor data on the virtual port. It would then automatically toggle the chip's $\overline{\text{RE}}$ pin based on the data flow direction. However, this approach introduced a new issue: the higher baud rates, the more frequently data loss would occurr. The root cause was the tool's reliance on polling to check the buffers, coupled with the use of the libgpiod
library to manipulate GPIOs for flow control, which was too slow to meet the performance requirements.
Adding to the complexity, the RS485 transceiver chip used in the product was the TPT7487. On this chip, the $\overline{\text{RE}}$ pin is responsible for controlling the chip's mode - either receive or transmit. However, instead of being connected to dedicated DTR pins, the $\overline{\text{RE}}$ pin was wired to several general-purpose GPIO pins on the Compute Module 4 (CM4). Unfortunately, the kernel driver did not configure these GPIOs as DTR pins, further limiting the ability to implement a hardware-based solution for automatic data direction control.
After thorough discussions and evaluations, the team concluded that replacing all affected products was the only viable solution. Despite the significant cost - an estimated $60,000 - the company prioritized customer satisfaction and product reliability. With no better technical solution available, the recall decision was made to uphold the company's reputation and ensure long-term trust from customers.
Out of curiosity, I requested the schematic of the product and, after some analysis, I surprisingly discovered what I believed could be a potential software solution to the issues. That marked the beginning of a new chapter in addressing the problem.
From the above schematic and the TPT7487 datasheet, it became clear that when the CM4RS485n_DTR (where n = [1, 2, 3]) pins are low, the transceiver enters receive mode; when they are high, the chip switches to transmit mode.
Since this product is based on the CM4, it is highly likely that most users would download and install the official Raspberry Pi OS image, which meant that modifying and redistributing the Raspberry Pi Linux kernel driver would be impractical. This approach would introduce significant inconvenience for users - especially since we don't even have an independent apt source!
Now the problem was clear: a solution that required kernel changes or custom distribution wouldn't work in this environment.
As I continued to think about the issue, I realized that I could write an out-of-tree kernel module to control the $\overline{\text{RE}}$ pin and thus achieve automatic toggling of the transceiver mode. However, the challenge was to implement this solution without relying on polling, which had been a major flaw in the original approach. At the time, I didn't have much experience with kernel modules, but I decided to take a chance and try to solve it.
I recalled an idea I had seen previously in server monitoring tools, where file changes could be tracked by hooking into the syscall layer. Inspired by this concept, I figured I could apply the same principle to monitor the UART calls and control the $\overline{\text{RE}}$ pin's state. Instead of polling, I thought I could hook into the relevant functions that handle data transmission and reception, allowing me to change the pin's state dynamically in response to the data flow direction.
By leveraging this approach, I aimed to develop a solution that was both efficient and non-intrusive, providing a clean software-level fix without needing to modify the underlying kernel or disrupt the user experience.
As I began exploring potential solutions to hook syscalls, I encountered a significant challenge with the existing syscall hook implementations. Many of the examples I found were designed for kernel versions prior to 5.7. This limitation stemmed from the fact that, starting from kernel 5.7, the kallsyms_lookup_name
function was no longer exported, which meant the approach of directly searching for kernel symbols by name wouldn't work.
After researching alternatives, I discovered that the kprobe mechanism could be used to achieve the same goal. The kprobe is a powerful tool in the Linux kernel that allows for dynamic instrumentation of functions at runtime. By using kprobe, I could locate and interact with kernel symbols (such as syscalls) even on newer kernels where kallsyms_lookup_name
is not available. This discovery provided a way to work around the limitations of newer kernel versions.
With this in mind, I proceeded to write a kernel module to implement syscall hooking using kprobe. The following code demonstrates how to hook the mkdir
syscall, replacing it with a custom version. The module dynamically locates kernel symbols using kprobe, bypassing the restriction imposed by kernel 5.7 and above.
BTW, The following code is designed to work with ARM64 architecture. For x86_64 architecture, the full demo has been provided on GitHub.
#include <asm/unistd.h>
#include <linux/kallsyms.h>
#include <linux/kernel.h>
#include <linux/kprobes.h>
#include <linux/module.h>
#include <linux/version.h>
#include <linux/vmalloc.h>
#define MODULE_NAME "syscall_hook"
#define LOG_PREFIX MODULE_NAME ": "
MODULE_DESCRIPTION("A simple module that hooks the `mkdir` function, works on kernel 5.7 and higher.");
MODULE_AUTHOR("Joshua Lee <[email protected]>");
MODULE_LICENSE("Dual MIT/GPL");
MODULE_VERSION("0.0.1");
// For Linux 5.7 and higher versions, `kallsyms_lookup_name` is not exported anymore.
// But we can use `kprobe` to find the address of `kallsyms_lookup_name`.
// The `custom_kallsyms_lookup_name` represents the address of `kallsyms_lookup_name` internally.
// For kernel 5.7 and below, the `custom_kallsyms_lookup_name` simply calls to `kallsyms_lookup_name`.
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 7, 0)
typedef unsigned long (*kallsyms_lookup_name_t)(const char* name);
static kallsyms_lookup_name_t custom_kallsyms_lookup_name;
#else
#define custom_kallsyms_lookup_name kallsyms_lookup_name
#endif
// `fixup_kallsyms_lookup_name` extracts the address of `kallsyms_lookup_name` from `kprobe`.
// It returns 0 on success, -EFAULT on failure.
static int fixup_kallsyms_lookup_name(void) {
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 7, 0)
struct kprobe kp = {.symbol_name = "kallsyms_lookup_name"};
int result = register_kprobe(&kp);
if (result < 0) {
printk(KERN_ERR LOG_PREFIX "Failed to register kprobe, returned code: %d\n", result);
return result;
}
custom_kallsyms_lookup_name = (kallsyms_lookup_name_t)kp.addr;
unregister_kprobe(&kp);
if (!custom_kallsyms_lookup_name) {
printk(KERN_ERR LOG_PREFIX "Failed to get address for `kallsyms_lookup_name`\n");
return -EFAULT;
}
printk(KERN_DEBUG LOG_PREFIX "Got address for `kallsyms_lookup_name`: %p\n", custom_kallsyms_lookup_name);
return 0;
#else
return 0;
#endif
}
struct vm_struct* (*custom_find_vm_area)(const void* base_addr); // `custom_find_vm_area` points to the address of `find_vm_area` function.
int (*custom_set_memory_rw)(unsigned long base_addr, int num_pages); // `custom_set_memory_rw` points to the address of `set_memory_rw` function.
int (*custom_set_memory_ro)(unsigned long base_addr, int num_pages); // `custom_set_memory_ro` points to the address of `set_memory_ro` function.
static unsigned long syscall_target_base_addr; // `syscall_target_base_addr` is the base address of target syscall.
typedef long (*syscall_fn_t)(const struct pt_regs* regs); // `syscall_fn_t` is the type of any syscall.
static syscall_fn_t prototype_mkdir; // `prototype_mkdir` is backup of original `mkdir` function.
static unsigned long* syscall_table; // `syscall_table` points to the address of `sys_call_table`.
// `custom_mkdir` is our custom `mkdir` function.
// Do whatever you want here and return the result.
static int custom_mkdir(const struct pt_regs* regs) {
char filename[512] = {0};
char __user* pathname = (char*)regs->regs[1];
if (copy_from_user(filename, pathname, sizeof(filename)) != 0) {
printk(KERN_ERR LOG_PREFIX "Failed to get file name from user\n");
return -1;
}
printk(KERN_INFO LOG_PREFIX "`mkdir` function called by user, file name: %s\n", filename);
return prototype_mkdir(regs); // Call original `mkdir`.
}
static int module_init_fn(void) {
if (fixup_kallsyms_lookup_name() < 0) {
return -1;
}
custom_set_memory_ro = (void*)custom_kallsyms_lookup_name("set_memory_ro");
if (custom_set_memory_ro == NULL) {
printk(KERN_ERR LOG_PREFIX "Could not find `set_memory_ro`\n");
return -1;
}
custom_set_memory_rw = (void*)custom_kallsyms_lookup_name("set_memory_rw");
if (custom_set_memory_rw == NULL) {
printk(KERN_ERR LOG_PREFIX "Could not find `set_memory_rw`\n");
return -1;
}
custom_find_vm_area = (void*)custom_kallsyms_lookup_name("find_vm_area");
if (custom_find_vm_area == NULL) {
printk(KERN_ERR LOG_PREFIX "Could not find `find_vm_area`\n");
return -1;
}
syscall_table = (unsigned long*)custom_kallsyms_lookup_name("sys_call_table");
if (syscall_table == NULL) {
printk(KERN_ERR LOG_PREFIX "Could not find `sys_call_table`\n");
return -1;
}
prototype_mkdir = (syscall_fn_t)syscall_table[__NR_mkdirat]; // Create backup of original `mkdir` function.
syscall_target_base_addr = ((unsigned long)(syscall_table + __NR_mkdirat)) & PAGE_MASK;
struct vm_struct* area = custom_find_vm_area((void*)syscall_target_base_addr);
if (area == NULL) {
printk(KERN_ERR LOG_PREFIX "Could not find vm area\n");
return -1;
}
area->flags |= VM_ALLOC;
int result = custom_set_memory_rw(syscall_target_base_addr, 1);
if (result != 0) {
printk(KERN_ERR LOG_PREFIX "Failed to set memory to read/write mode\n");
return -1;
}
syscall_table[__NR_mkdirat] = (unsigned long)custom_mkdir; // Replace original `mkdir` with our custom one.
result = custom_set_memory_ro(syscall_target_base_addr, 1);
if (result != 0) {
printk(KERN_ERR LOG_PREFIX "Failed to set memory to read-only mode\n");
return -1;
}
printk(KERN_INFO LOG_PREFIX "Hooked `mkdir` function successfully (%p => %p)\n", prototype_mkdir, custom_mkdir);
return 0;
}
static void module_end_fn(void) {
int result = custom_set_memory_rw(syscall_target_base_addr, 1);
if (result != 0) {
printk(KERN_ERR LOG_PREFIX "Failed to set memory to read/write mode\n");
return;
}
syscall_table[__NR_mkdirat] = (unsigned long)prototype_mkdir; // Restore original `mkdir` function.
result = custom_set_memory_ro(syscall_target_base_addr, 1);
if (result != 0) {
printk(KERN_ERR LOG_PREFIX "Failed to set memory to read-only mode\n");
return;
}
printk(KERN_INFO LOG_PREFIX "Unhooked `mkdir` function successfully (%p => %p)\n", custom_mkdir, prototype_mkdir);
}
module_init(module_init_fn);
module_exit(module_end_fn);
Cheers!
Over the next few days, I broke down the task into smaller steps and gradually implemented the required functionality:
uart_write
in the Kernel Module to Raise GPIO Before Transmissionuart_write
Completion and Lower GPIO After TransmissionThanks to an article I wrote in 2023 on using mmap for GPIO control, I successfully implemented GPIO control in a Raspberry Pi kernel module. The final code is as follows, I also made it public on my GitHub repository.
#include <asm/io.h>
#include <linux/delay.h>
#include <linux/kprobes.h>
#include <linux/slab.h>
#include <linux/tty.h>
#include <linux/workqueue.h>
#ifndef MODULE_NAME
#define MODULE_NAME "r1000v1_rs485_autoflow"
#endif
#ifndef MODULE_VER
#define MODULE_VER "custom"
#endif
MODULE_DESCRIPTION("This module fixes RS-485 flow control issue on reComputer R1000 v1.0 by hooking `uart_write` function.");
MODULE_AUTHOR("Joshua Lee <[email protected]>");
MODULE_LICENSE("Dual MIT/GPL");
MODULE_VERSION(MODULE_VER);
#define BCM2711_GPIO_BASE (0xfe000000 + 0x200000)
volatile unsigned int* GPFSEL0; // Function selector for GPIO 0-9, for CM4_RS485_1_DTR at GPIO_6.
volatile unsigned int* GPFSEL1; // Function selector for GPIO 10-19, for CM4_RS485_2_DTR at GPIO_17.
volatile unsigned int* GPFSEL2; // Function selector for GPIO 20-29, for CM4_RS485_3_DTR at GPIO_24.
volatile unsigned int* GPSET0; // Register to set GPIO 0-31 to high.
volatile unsigned int* GPCLR0; // Register to set GPIO 0-31 to low.
volatile unsigned int* GPIO_PUP_PDN_CNTRL_REG0; // Register to set pull up/down control of GPIO 0-15.
volatile unsigned int* GPIO_PUP_PDN_CNTRL_REG1; // Register to set pull up/down control of GPIO 16-31.
static void rs485_dtr_init(void) {
// Re-map GPIO registers, offsets are given in the datasheet
GPFSEL0 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0x00, 4);
GPFSEL1 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0x04, 4);
GPFSEL2 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0x08, 4);
GPSET0 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0x1c, 4);
GPCLR0 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0x28, 4);
GPIO_PUP_PDN_CNTRL_REG0 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0xe4, 4);
GPIO_PUP_PDN_CNTRL_REG1 = (volatile unsigned int*)ioremap(BCM2711_GPIO_BASE + 0xe8, 4);
// Set CM4_RS485_1_DTR at GPIO_6 to output mode (GPFSEL0[20:18]), no internal pull
*GPFSEL0 &= ~(7 << 18);
*GPFSEL0 |= (1 << 18);
*GPIO_PUP_PDN_CNTRL_REG0 &= ~(3 << 12);
*GPIO_PUP_PDN_CNTRL_REG0 |= (0 << 12);
// Set CM4_RS485_2_DTR at GPIO_17 to output mode (GPFSEL1[23:21]), no internal pull
*GPFSEL1 &= ~(7 << 21);
*GPFSEL1 |= (1 << 21);
*GPIO_PUP_PDN_CNTRL_REG1 &= ~(3 << 2);
*GPIO_PUP_PDN_CNTRL_REG1 |= (0 << 2);
// Set CM4_RS485_3_DTR at GPIO_24 to output mode (GPFSEL2[14:12]), no internal pull
*GPFSEL2 &= ~(7 << 12);
*GPFSEL2 |= (1 << 12);
*GPIO_PUP_PDN_CNTRL_REG1 &= ~(3 << 16);
*GPIO_PUP_PDN_CNTRL_REG1 |= (0 << 16);
// Set all DTR pins to low
*GPCLR0 = (1 << 6) | (1 << 17) | (1 << 24);
}
static void rs485_dtr_deinit(void) {
// Set all DTR pins to low
*GPCLR0 = (1 << 6) | (1 << 17) | (1 << 24);
// Unmap GPIO registers
iounmap(GPFSEL0);
iounmap(GPFSEL1);
iounmap(GPFSEL2);
iounmap(GPSET0);
iounmap(GPCLR0);
iounmap(GPIO_PUP_PDN_CNTRL_REG0);
iounmap(GPIO_PUP_PDN_CNTRL_REG1);
}
static bool rs485_is_builtin_dev(struct tty_struct* tty) {
// `ttyAMA` is for built-in RS-485 interface
return strcmp(tty->driver->name, "ttyAMA") == 0;
}
static void rs485_dtr_set(int dev_num, bool enable) {
switch (dev_num) {
case 2: // ttyAMA2
if (enable) {
*GPSET0 = (1 << 6);
} else {
*GPCLR0 = (1 << 6);
}
break;
case 3: // ttyAMA3
if (enable) {
*GPSET0 = (1 << 17);
} else {
*GPCLR0 = (1 << 17);
}
break;
case 5: // ttyAMA5
if (enable) {
*GPSET0 = (1 << 24);
} else {
*GPCLR0 = (1 << 24);
}
break;
}
}
static int rs485_get_dev_num(struct tty_struct* tty) {
if (tty->index == 2 || tty->index == 3 || tty->index == 5) {
return tty->index;
}
return -EINVAL;
}
struct rs485_worker_t {
struct delayed_work work;
struct tty_struct* tty;
};
static struct workqueue_struct* rs485_worker_queues[3]; // 3 queues for 3 RS-485 interfaces (ttyAMA2, ttyAMA3, ttyAMA5)
static int rs485_get_worker_index(int dev_num) {
if (dev_num == 2) {
return 0;
} else if (dev_num == 3) {
return 1;
} else if (dev_num == 5) {
return 2;
}
return -EINVAL;
}
static void rs485_worker_oncomplete(struct work_struct* work) {
struct rs485_worker_t* rs485_worker = container_of(work, struct rs485_worker_t, work.work);
// Wait until data is sent out, then set DTR to low
if (rs485_worker->tty->ops->write_room(rs485_worker->tty) == 0) {
schedule_delayed_work(&rs485_worker->work, usecs_to_jiffies(1));
return;
}
// Wait for some time before setting DTR to low, delay is based on baudrate
// Each character takes (10 * 1000 / baudrate) milliseconds
// Plus 60ns for transceiver mode switch (mentionned in TPT7487 datasheet)
int baudrate = tty_get_baud_rate(rs485_worker->tty);
msleep((10 * 1000) / baudrate);
ndelay(60);
rs485_dtr_set(rs485_worker->tty->index, false);
kfree(rs485_worker);
}
static void hook_uart_write_onreturn(struct kprobe* p, struct pt_regs* regs, unsigned long flags) {
struct tty_struct* tty = (struct tty_struct*)regs->regs[0];
if (rs485_is_builtin_dev(tty)) {
int dev_num = rs485_get_dev_num(tty);
if (dev_num != -EINVAL) {
struct rs485_worker_t* rs485_worker = kmalloc(sizeof(*rs485_worker), GFP_KERNEL);
rs485_worker->tty = tty;
if (rs485_worker) {
INIT_DELAYED_WORK(&rs485_worker->work, rs485_worker_oncomplete);
int queue_index = rs485_get_worker_index(dev_num);
if (queue_index != -EINVAL) {
queue_delayed_work(rs485_worker_queues[queue_index], &rs485_worker->work, 0);
}
}
}
}
}
static int hook_uart_write_onstart(struct kprobe* p, struct pt_regs* regs) {
struct tty_struct* tty = (struct tty_struct*)regs->regs[0];
if (rs485_is_builtin_dev(tty)) {
int dev_num = rs485_get_dev_num(tty);
rs485_dtr_set(dev_num, true);
}
return 0;
}
static unsigned long get_fn_addr(const char* symbol_name) {
struct kprobe temp_kp = {.symbol_name = symbol_name};
int ret = register_kprobe(&temp_kp);
unsigned long fn_addr = (unsigned long)temp_kp.addr;
unregister_kprobe(&temp_kp);
if (ret < 0) {
return ret;
}
if (temp_kp.addr == NULL) {
return -EFAULT;
}
return fn_addr;
}
#define LOG_PREFIX MODULE_NAME ": "
struct kprobe hook_uart_write;
static int module_init_fn(void) {
rs485_dtr_init();
// Create worker queues for each RS-485 interface
rs485_worker_queues[0] = create_singlethread_workqueue(MODULE_NAME "_worker_queue_2");
if (rs485_worker_queues[0] == NULL) {
printk(KERN_ERR LOG_PREFIX "Failed to create worker queue for ttyAMA2\n");
return -ENOMEM;
}
rs485_worker_queues[1] = create_singlethread_workqueue(MODULE_NAME "_worker_queue_3");
if (rs485_worker_queues[1] == NULL) {
printk(KERN_ERR LOG_PREFIX "Failed to create worker queue for ttyAMA3\n");
return -ENOMEM;
}
rs485_worker_queues[2] = create_singlethread_workqueue(MODULE_NAME "_worker_queue_5");
if (rs485_worker_queues[2] == NULL) {
printk(KERN_ERR LOG_PREFIX "Failed to create worker queue for ttyAMA5\n");
return -ENOMEM;
}
// Hook `uart_write` function
unsigned long target_fn_addr = get_fn_addr("uart_write");
if (target_fn_addr < 0) {
printk(KERN_ERR LOG_PREFIX "Failed to get address for `uart_write`, returned code: %ld\n", target_fn_addr);
return target_fn_addr;
}
hook_uart_write.addr = (kprobe_opcode_t*)target_fn_addr;
hook_uart_write.pre_handler = (void*)hook_uart_write_onstart;
hook_uart_write.post_handler = (void*)hook_uart_write_onreturn;
int ret = register_kprobe(&hook_uart_write);
if (ret < 0) {
printk(KERN_ERR LOG_PREFIX "Failed to register kprobe for `uart_write`, returned code: %d\n", ret);
return ret;
}
printk(KERN_INFO LOG_PREFIX "RS-485 interface has been hooked successfully\n");
return 0;
}
static void module_exit_fn(void) {
unregister_kprobe(&hook_uart_write);
for (int i = 0; i < sizeof(rs485_worker_queues) / sizeof(rs485_worker_queues[0]); i++) {
if (rs485_worker_queues[i]) {
destroy_workqueue(rs485_worker_queues[i]);
}
}
rs485_dtr_deinit();
printk(KERN_INFO LOG_PREFIX "RS-485 interface has been unhooked successfully\n");
}
module_init(module_init_fn);
module_exit(module_exit_fn);
After deploying the module and testing extensively across various baud rates, the issue was successfully resolved. There was no need to replace products or recall shipments, saving the company $60,000 and sparing customers from potential inconvenience. Well, the software-level solution proved efficient, effective, and sustainable.
This experience reaffirmed the importance of thinking creatively when addressing challenges. Even as a newcomer, I was able to leverage open-source tools and innovative thinking to tackle a seemingly insurmountable problem. It’s a reminder that in technology, persistence and curiosity often yield unexpected and rewarding outcomes.
2023-05-20 13:08:00
When it comes to control the GPIO pins on a development board, many of developers have tried to access the GPIO pins from the user space (e.g. The sysfs
). But this approach may not perform well because it's not that fast as you may thought - at least it was vividly reflected in one of my recent needs.
Recently I took over a maker-level project that uses the Raspberry Pi and in the project, the Pi needs to continuously poll the level of a certain output from the FPGA at high speed to determine whether the data is "ready". However, as the price of Raspberry Pi rises, I had to consider the affordable alternative, finally I settled on the Orange Pi One, which uses the Allwinner H3 and has a 512 MB of RAM - That's enough for my needs.
In the original Raspberry Pi implementation for the project, the state of the GPIO pins was obtained directly by accessing the GPIO registers - the memory mapping (mmap), and we know that different SOCs have different internal register sets and addresses, so to port the project to the Allwinner H3, some significant changes in the code were required.
This article will introduce how I understand the concept of mmap, and how did I analyse the datasheet provided by Allwinner, in the last, I'll provide a complete example of how to use mmap to access GPIO pins in C, Go, and Python.
In simple terms, mmap allows a physical memory region to be mapped to the application's virtual memory space. This enables direct manipulation of CPU registers at the application level.
In conventional development, we typically use the Linux generic sysfs interface to control GPIOs. Through this interface, GPIOs can be configured to output specific signal levels or read external signals input to the GPIO. However, this method is only suitable for scenarios where speed requirements are not stringent. When high-speed GPIO access is required, the sysfs method becomes inefficient. This is because sysfs relies on file I/O operations for GPIO control. Each operation involves accessing the file system and making system calls, which introduces performance overhead.
To eliminate this additional performance cost and break through the file I/O bottleneck, an alternative approach is to bypass these layers. The mmap method allows direct operations on GPIOs in physical memory, theoretically achieving faster GPIO access.
It is known that the GPIOs on a development board are essentially part of the CPU pins. Since these pins can be controlled programmatically, there must be corresponding registers within the CPU. These registers are mapped to specific physical address ranges that remain constant. By using mmap to operate on the CPU's internal registers, it becomes possible to control specific GPIOs in this manner.
To control GPIOs via mmap, the process involves the following 5 steps:
/dev/mem
device file.Among these steps, step 2 requires consulting the CPU vendor's datasheet to identify the relevant details.
The datasheet, provided by Allwinner is an extensive 618 pages, making it impractical to read in full. Instead, relevant sections can be found by using Ctrl + F to search for the following keywords:
And the key findings from the datasheet are given as follows:
0x01C20000
.0x01C20800
.0x0800
.on page 318, section 4.22.2, at Port Controller Register, the GPIO register configurations are detailed. After excluding interrupt-related registers, the useful registers for GPIO configuration are as follows:
Register | Offset | Description | Remarks |
---|---|---|---|
Pn_CFG0 | n*0x24 + 0x00 | GPIO_n configuration register 0 for setting up pin mode | First determine the port number (e.g. GPIO_A) and pin number (e.g. 6) for which you want to set the GPIO mode. |
Pn_CFG1 | n*0x24 + 0x04 | GPIO_n configuration register 1 for setting up pin mode | Then in the Pn Configure Register x table following this table in the Datasheet, where n = [0…6] and x = [0…3], |
Pn_CFG2 | n*0x24 + 0x08 | GPIO_n configuration register 2 for setting up pin mode | find the corresponding "bit" and "value" for the GPIO mode you want to set. |
Pn_CFG3 | n*0x24 + 0x0C | GPIO_n configuration register 3 for setting up pin mode | (The code n mentioned above also refers to one of the GPIO port numbers A, C, D, E, F, G, L) |
Pn_DAT | n*0x24 + 0x10 | GPIO_n data register for accessing the state of GPIO pin | In input mode, the corresponding bit indicates the pin status; in output mode, the pin status is the same as the corresponding bit |
Pn_DRV0 | n*0x24 + 0x14 | GPIO_n drive capability register 0, used to configure the output drive capability of GPIO n | |
Pn_DRV1 | n*0x24 + 0x18 | GPIO_n drive capability register 1, used to configure the output drive capability of GPIO n | |
Pn_PUL0 | n*0x24 + 0x1C | GPIO_n Pull-up / Pull-down register 0, used to configure the pull-up / pull-down of GPIO n | |
Pn_PUL1 | n*0x24 + 0x20 | GPIO_n Pull-up / Pull-down register 1, used to configure the pull-up / pull-down of GPIO n |
Although we know the GPIO register base address is 0x01C20800
, specific ports like GPIO_A require knowledge of their offset from the GPIO base address.
From page 319 of the datasheet, section 4.22.2.1 (PA Configure Register 0), we learn: GPIOA registers start at an offset of 0x00 relative to the GPIO base address, the configuration for GPIOA registers ends at 0x01C20820
, with a total width of 0x20 bytes.
Given this layout, the following C structure can represent the GPIO register set:
typedef struct {
volatile uint32_t config[4];
volatile uint32_t data;
volatile uint32_t driver[2];
volatile uint32_t pull[2];
} gpio_t;
In this structure, the keyword volatile
tells the compiler that the member variables of this structure may be modified by other threads or interrupts, so the compiler should not optimize this structure.
To configure GPIO_A20 as an output and drive it high, refer to the following details from the datasheet:
0x01
to configure it as an output.0x01
.To configure GPIO_A8 as an input and enable the pull-up resistor, refer to the following details from the datasheet:
0x00
to configure it as an input.0x01
.With the foundational knowledge covered, we can now write programs to solve the problem.
These examples demonstrates how to use C, Go, and Python to control GPIOA21 to toggle an LED and read the level of GPIOA8. The level of GPIO_A8 is printed to the terminal.
Note: Since GPIO_A8 has pull-up enabled, its default level is high unless the pin is grounded. Similarly, if pull-down is enabled, the default level will be low unless connected to power supply.
Here is the C version, where the gpio_t
structure is the same as defined earlier. The set_output
function configures the GPIO pin as output mode, the set_input
function configures the GPIO pin as input mode, the set_level
function sets the GPIO pin level, and the get_level
function reads the GPIO pin level.
#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <unistd.h>
// GPIO_A configuration register list
// { Register number, Register bit }
const int GPIO_A_CONFIG[22][2] = {
{0, 0}, // PA0
{0, 4}, // PA1
{0, 8}, // PA2
{0, 12}, // PA3
{0, 16}, // PA4
{0, 20}, // PA5
{0, 24}, // PA6
{0, 28}, // PA7
{1, 0}, // PA8
{1, 4}, // PA9
{1, 8}, // PA10
{1, 12}, // PA11
{1, 16}, // PA12
{1, 20}, // PA13
{1, 24}, // PA14
{1, 28}, // PA15
{2, 0}, // PA16
{2, 4}, // PA17
{2, 8}, // PA18
{2, 12}, // PA19
{2, 16}, // PA20
{2, 20}, // PA21
};
// Base address of registers
#define ALLWINNER_H3_BASE 0x01C20000
// Offset of GPIO_A relative to ALLWINNER_H3_BASE
#define GPIO_PA_OFFSET 0x0800
// Size of the memory region to be mapped using mmap
#define MMAP_SIZE 0x1000
// GPIO mode configuration
enum GPIO_MODE {
INPUT = 0,
OUTPUT,
};
// GPIO level configuration
enum GPIO_LEVEL {
LOW = 0,
HIGH,
};
// GPIO pull-up/pull-down configuration
enum GPIO_PULL {
PULL_OFF = 0,
PULL_UP,
PULL_DOWN,
};
// GPIO port register type
typedef struct {
volatile uint32_t config[4];
volatile uint32_t data;
volatile uint32_t driver[2];
volatile uint32_t pull[2];
} gpio_t;
// Configure the specified pin of GPIO_A as output
void set_output(gpio_t* gpio, int pin) {
// Get the register number and register bit
int reg = GPIO_A_CONFIG[pin][0];
int bit = GPIO_A_CONFIG[pin][1];
// Clear the previous configuration
gpio->config[reg] &= ~(0x0F << bit);
// Set as output mode
gpio->config[reg] |= (OUTPUT << bit);
}
// Configure the specified pin of GPIO_A as input
void set_input(gpio_t* gpio, int pin) {
// Get the register number and register bit
int reg = GPIO_A_CONFIG[pin][0];
int bit = GPIO_A_CONFIG[pin][1];
// Clear the previous configuration
gpio->config[reg] &= ~(0x0F << bit);
// Set as input mode
gpio->config[reg] |= (INPUT << bit);
}
// Configure pull-up/pull-down for the specified pin of GPIO_A
void set_pull(gpio_t* gpio, int pin, int pull) {
// Get the register number
int reg = pin / 16;
// Get the register bit
int bit = (pin % 16) * 2;
// Clear the previous configuration
gpio->pull[reg] &= ~(0x03 << bit);
// Set pull-up/pull-down configuration
gpio->pull[reg] |= (uint32_t)pull << bit;
}
// Set the level of the specified pin of GPIO_A
void set_level(gpio_t* gpio, int pin, int level) {
switch (level) {
case HIGH:
gpio->data |= (1 << pin);
return;
case LOW:
gpio->data &= ~(1 << pin);
return;
defaults:
return;
}
}
// Read the level of the specified pin of GPIO_A
int get_level(gpio_t* gpio, int pin) {
// Get the register number and register bit
int reg = GPIO_A_CONFIG[pin][0];
int bit = GPIO_A_CONFIG[pin][1];
// Clear the previous configuration
gpio->config[reg] &= ~(0x0F << bit);
return (gpio->data >> pin) & 0x01;
}
int main() {
// Open /dev/mem device file in read-write mode
int mem = open("/dev/mem", O_RDWR | O_SYNC);
if (mem < 0) {
perror("open /dev/mem");
return -1;
}
// Map the register into memory
char* reg = (char*)mmap(NULL, MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED,
mem, ALLWINNER_H3_BASE);
if (reg == MAP_FAILED) {
perror("mmap");
close(mem);
return -1;
}
// Apply the offset to the GPIO_A register type
gpio_t* gpio = (gpio_t*)®[GPIO_PA_OFFSET];
// Set GPIO_A21 as output
set_output(gpio, 21);
// Set GPIO_A8 as input
set_input(gpio, 8);
// Enable pull-up for GPIO_A8
set_pull(gpio, 8, PULL_UP);
// Blink the LED and read the level
for (;;) {
// Toggle GPIO_A21 LED
set_level(gpio, 21, HIGH);
usleep(500000);
set_level(gpio, 21, LOW);
usleep(500000);
// Read and print the level of GPIO_A8
int level = get_level(gpio, 8);
printf("GPIO_A8 level: %d\n", level);
}
// Unmap the memory
munmap(gpio, MMAP_SIZE);
close(mem);
return 0;
}
Here is the Go language implementation. In this version, memory mapping is performed using the syscall.Mmap()
and syscall.Munmap()
functions.
Additionally, unlike the C version where pointer casting is done with (gpio_t *) and dereferencing operators *, the Go version uses the unsafe.Pointer
type and the unsafe.Pointer()
function for pointer conversion, allowing direct memory address manipulation.
package main
import (
"fmt"
"os"
"syscall"
"time"
"unsafe"
)
// GPIO_A configuration register list
// { register number, register bit }
var GPIO_A_CONFIG = [22][2]int{
{0, 0}, // PA0
{0, 4}, // PA1
{0, 8}, // PA2
{0, 12}, // PA3
{0, 16}, // PA4
{0, 20}, // PA5
{0, 24}, // PA6
{0, 28}, // PA7
{1, 0}, // PA8
{1, 4}, // PA9
{1, 8}, // PA10
{1, 12}, // PA11
{1, 16}, // PA12
{1, 20}, // PA13
{1, 24}, // PA14
{1, 28}, // PA15
{2, 0}, // PA16
{2, 4}, // PA17
{2, 8}, // PA18
{2, 12}, // PA19
{2, 16}, // PA20
{2, 20}, // PA21
}
const (
// Base address of registers
ALLWINNER_H3_BASE = 0x01C20000
// Offset of GPIO_A relative to ALLWINNER_H3_BASE
GPIO_PA_OFFSET = 0x0800
// Size of the region to be mapped when using the mmap function
MMAP_SIZE = 0x1000
)
// GPIO mode configuration
const (
INPUT = 0
OUTPUT = 1
)
// GPIO level configuration
const (
LOW = 0
HIGH = 1
)
// Pull-up/down configuration
const (
PULL_OFF = 0
PULL_UP = 1
PULL_DOWN = 2
)
// GPIO port register type
type gpio_t struct {
config [4]uint32
data uint32
driver [2]uint32
pull [2]uint32
}
func setOutput(gpio *gpio_t, pin int) {
// Get the register number and register bit
reg := GPIO_A_CONFIG[pin][0]
bit := GPIO_A_CONFIG[pin][1]
// Clear the original configuration
gpio.config[reg] &= ^(0x0F << bit)
// Set to output mode
gpio.config[reg] |= OUTPUT << bit
}
func setInput(gpio *gpio_t, pin int) {
// Get the register number and register bit
reg := GPIO_A_CONFIG[pin][0]
bit := GPIO_A_CONFIG[pin][1]
// Clear the original configuration
gpio.config[reg] &= ^(0x0F << bit)
// Set to input mode
gpio.config[reg] |= INPUT << bit
}
func setPull(gpio *gpio_t, pin, pull int) {
// Get the register number
reg := pin / 16
// Get the register bit
bit := (pin % 16) * 2
// Clear the original configuration
gpio.pull[reg] &= ^(0x03 << bit)
// Set pull-up/down
gpio.pull[reg] |= uint32(pull) << bit
}
func setLevel(gpio *gpio_t, pin, level int) {
switch level {
case HIGH:
gpio.data |= 1 << pin
case LOW:
gpio.data &= ^(1 << pin)
}
}
func getLevel(gpio *gpio_t, pin int) int {
// Get the register number and register bit
reg := GPIO_A_CONFIG[pin][0]
bit := GPIO_A_CONFIG[pin][1]
// Clear the original configuration
gpio.config[reg] &= ^(0x0F << bit)
return int((gpio.data >> pin) & 0x01)
}
func main() {
// Open /dev/mem device file in read-write mode
mem, err := os.OpenFile("/dev/mem", os.O_RDWR|os.O_SYNC, 0)
if err != nil {
fmt.Printf("Failed to open /dev/mem: %v\n", err)
return
}
defer mem.Close()
// Map the registers to memory
reg, err := syscall.Mmap(int(mem.Fd()), ALLWINNER_H3_BASE, MMAP_SIZE, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
fmt.Printf("Failed to mmap: %v\n", err)
return
}
defer syscall.Munmap(reg)
// Apply the mapped address offset to the GPIO_A register type
gpio := (*gpio_t)(unsafe.Pointer(®[GPIO_PA_OFFSET]))
// Set GPIO_A21 as output mode
setOutput(gpio, 21)
// Set GPIO_A8 as input mode
setInput(gpio, 8)
// Enable pull-up for GPIO_A8
setPull(gpio, 8, PULL_UP)
// Blink the LED and read the level
for {
// Blink the GPIO_A21 LED
setLevel(gpio, 21, HIGH)
time.Sleep(time.Millisecond * 500)
setLevel(gpio, 21, LOW)
time.Sleep(time.Millisecond * 500)
// Read and print the level of GPIO_A8
level := getLevel(gpio, 8)
fmt.Printf("GPIO_A8 level: %d\n", level)
}
}
The Python implementation is similar to the Go implementation, using the mmap
library to perform memory mapping and unmapping.
from mmap import mmap, MAP_SHARED, PROT_READ, PROT_WRITE
from os import open, close, O_RDWR, O_SYNC
from ctypes import Structure, c_uint32
from typing import Type
from time import sleep
from sys import exit
# GPIO_A configuration register list
# { register number, register bit }
GPIO_A_CONFIG = [
[0, 0], # PA0
[0, 4], # PA1
[0, 8], # PA2
[0, 12], # PA3
[0, 16], # PA4
[0, 20], # PA5
[0, 24], # PA6
[0, 28], # PA7
[1, 0], # PA8
[1, 4], # PA9
[1, 8], # PA10
[1, 12], # PA11
[1, 16], # PA12
[1, 20], # PA13
[1, 24], # PA14
[1, 28], # PA15
[2, 0], # PA16
[2, 4], # PA17
[2, 8], # PA18
[2, 12], # PA19
[2, 16], # PA20
[2, 20], # PA21
]
# Base address of the registers
ALLWINNER_H3_BASE = 0x01C20000
# Offset of GPIO_A relative to ALLWINNER_H3_BASE
GPIO_PA_OFFSET = 0x0800
# Size of the region to be mapped when using the mmap function
MMAP_SIZE = 0x1000
# GPIO mode configuration
INPUT = 0
OUTPUT = 1
# GPIO level configuration
LOW = 0
HIGH = 1
# Pull-up/down configuration
PULL_OFF = 0
PULL_UP = 1
PULL_DOWN = 2
# GPIO port register type
class gpio_t(Structure):
_fields_ = [
("config", c_uint32 * 4),
("data", c_uint32),
("driver", c_uint32 * 2),
("pull", c_uint32 * 2),
]
# Configure the specified GPIO_A pin as output mode
def set_output(gpio: Type[gpio_t], pin: int) -> None:
# Get the register number and register bit
reg, bit = GPIO_A_CONFIG[pin]
# Clear the original configuration
gpio.config[reg] &= ~(0x0F << bit)
# Set as output mode
gpio.config[reg] |= (OUTPUT << bit)
# Configure the specified GPIO_A pin as input mode
def set_input(gpio: Type[gpio_t], pin: int) -> None:
# Get the register number and register bit
reg, bit = GPIO_A_CONFIG[pin]
# Clear the original configuration
gpio.config[reg] &= ~(0x0F << bit)
# Set as input mode
gpio.config[reg] |= (INPUT << bit)
# Configure pull-up/down for the specified GPIO_A pin
def set_pull(gpio: Type[gpio_t], pin: int, pull: int) -> None:
# Get the register number
reg = int(pin / 16)
# Get the register bit
bit = int((pin % 16) * 2)
# Clear the original configuration
gpio.pull[reg] &= ~(0x03 << bit)
# Set pull-up/down
gpio.pull[reg] |= (pull << bit)
# Set the level of the specified GPIO_A pin
def set_level(gpio: Type[gpio_t], pin: int, level: int) -> None:
if level == HIGH:
gpio.data |= (1 << pin)
elif level == LOW:
gpio.data &= ~(1 << pin)
# Read the level of the specified GPIO_A pin
def get_level(gpio: Type[gpio_t], pin: int) -> int:
# Get the register number and register bit
reg, bit = GPIO_A_CONFIG[pin]
# Clear the original configuration
gpio.config[reg] &= ~(0x0F << bit)
return (gpio.data >> pin) & 0x01
def main():
# Open the /dev/mem device file in read-write mode
mem = open("/dev/mem", O_RDWR | O_SYNC)
if mem < 0:
print("Failed to open /dev/mem")
exit(1)
# Map the registers to memory
reg = mmap(
mem, MMAP_SIZE, MAP_SHARED,
PROT_READ | PROT_WRITE,
offset=ALLWINNER_H3_BASE
)
# Apply the mapped address offset to the GPIO_A register type
gpio = gpio_t.from_buffer(reg, GPIO_PA_OFFSET)
# Set GPIO_A21 as output mode
set_output(gpio, 21)
# Set GPIO_A8 as input mode
set_input(gpio, 8)
# Enable pull-up for GPIO_A8
set_pull(gpio, 8, PULL_UP)
# Blink the LED and read the level
while True:
# Blink the GPIO_A21 LED
set_level(gpio, 21, HIGH)
sleep(0.5)
set_level(gpio, 21, LOW)
sleep(0.5)
# Read and print the level of GPIO_A8
level = get_level(gpio, 8)
print("GPIO_A8 level:", level)
# Unmap the memory
reg.close()
close(mem)
if __name__ == "__main__":
main()
Last but not least, we have successfully implemented the memory mapping in the Allwinner H3 platform. It works like magic now!