feat(python): add queued worker runtime and configurable transform mode

This commit is contained in:
2026-02-23 22:26:54 +01:00
parent 8e8f099768
commit 838ea34ab7
21 changed files with 744 additions and 88 deletions

View File

@@ -0,0 +1,261 @@
import * as path from 'path';
import { Worker } from 'worker_threads';
interface WorkerRunTransformRequest {
type: 'runTransform';
requestId: string;
scriptContent: string;
entrypoint: string;
payloadJson: string;
}
interface WorkerReadyMessage {
type: 'ready';
}
interface WorkerResultMessage {
type: 'transformResult';
requestId: string;
output: unknown;
toasts: string[];
}
interface WorkerErrorMessage {
type: 'transformError';
requestId: string;
error: string;
}
interface WorkerFatalErrorMessage {
type: 'error';
error: string;
}
type WorkerResponseMessage = WorkerReadyMessage | WorkerResultMessage | WorkerErrorMessage | WorkerFatalErrorMessage;
interface QueuedRequest {
request: WorkerRunTransformRequest;
timeoutMs: number;
resolve: (value: { output: unknown; toasts: string[] }) => void;
reject: (error: Error) => void;
}
interface ActiveRequest extends QueuedRequest {
timeoutId: ReturnType<typeof setTimeout> | null;
}
export class BlogmarkPythonWorkerRuntime {
private worker: Worker | null = null;
private workerReady = false;
private workerStartPromise: Promise<void> | null = null;
private workerStartResolve: (() => void) | null = null;
private workerStartReject: ((error: Error) => void) | null = null;
private activeRequest: ActiveRequest | null = null;
private queue: QueuedRequest[] = [];
private requestCounter = 0;
async executeTransform(params: {
scriptContent: string;
entrypoint: string;
payloadJson: string;
timeoutMs?: number;
}): Promise<{ output: unknown; toasts: string[] }> {
const requestId = this.nextRequestId();
const timeoutMs = params.timeoutMs ?? 5000;
return new Promise<{ output: unknown; toasts: string[] }>((resolve, reject) => {
this.queue.push({
request: {
type: 'runTransform',
requestId,
scriptContent: params.scriptContent,
entrypoint: params.entrypoint,
payloadJson: params.payloadJson,
},
timeoutMs,
resolve,
reject,
});
this.dispatchNext().catch((error) => {
reject(error instanceof Error ? error : new Error(String(error)));
});
});
}
dispose(): void {
this.rejectStartPromise(new Error('Python worker runtime disposed'));
this.rejectActiveAndQueue(new Error('Python worker runtime disposed'));
this.resetWorker();
}
private async dispatchNext(): Promise<void> {
if (this.activeRequest || this.queue.length === 0) {
return;
}
await this.ensureWorkerStarted();
const nextRequest = this.queue.shift();
if (!nextRequest) {
return;
}
const timeoutId = setTimeout(() => {
if (!this.activeRequest || this.activeRequest.request.requestId !== nextRequest.request.requestId) {
return;
}
const timeoutError = new Error(`Python transform timed out after ${nextRequest.timeoutMs}ms`);
this.activeRequest.reject(timeoutError);
this.activeRequest = null;
this.resetWorker();
void this.dispatchNext();
}, nextRequest.timeoutMs);
this.activeRequest = {
...nextRequest,
timeoutId,
};
this.worker?.postMessage(nextRequest.request);
}
private async ensureWorkerStarted(): Promise<void> {
if (this.worker && this.workerReady) {
return;
}
if (this.workerStartPromise) {
return this.workerStartPromise;
}
const workerPath = path.join(__dirname, 'blogmarkPython.worker.js');
this.worker = new Worker(workerPath);
this.workerReady = false;
this.worker.on('message', (message: WorkerResponseMessage) => {
this.handleWorkerMessage(message);
});
this.worker.on('error', (error) => {
this.handleWorkerCrash(error instanceof Error ? error : new Error(String(error)));
});
this.worker.on('exit', (code) => {
if (code !== 0) {
this.handleWorkerCrash(new Error(`Python worker exited with code ${code}`));
}
});
this.workerStartPromise = new Promise<void>((resolve, reject) => {
this.workerStartResolve = resolve;
this.workerStartReject = reject;
});
return this.workerStartPromise;
}
private handleWorkerMessage(message: WorkerResponseMessage): void {
if (message.type === 'ready') {
this.workerReady = true;
this.resolveStartPromise();
return;
}
if (message.type === 'error') {
this.handleWorkerCrash(new Error(message.error));
return;
}
const active = this.activeRequest;
if (!active) {
return;
}
if (active.request.requestId !== message.requestId) {
return;
}
if (active.timeoutId) {
clearTimeout(active.timeoutId);
}
this.activeRequest = null;
if (message.type === 'transformResult') {
active.resolve({ output: message.output, toasts: message.toasts });
} else {
active.reject(new Error(message.error));
}
void this.dispatchNext();
}
private handleWorkerCrash(error: Error): void {
this.rejectStartPromise(error);
this.rejectActiveAndQueue(error);
this.resetWorker();
}
private rejectActiveAndQueue(error: Error): void {
if (this.activeRequest) {
if (this.activeRequest.timeoutId) {
clearTimeout(this.activeRequest.timeoutId);
}
this.activeRequest.reject(error);
this.activeRequest = null;
}
while (this.queue.length > 0) {
const queued = this.queue.shift();
queued?.reject(error);
}
}
private resolveStartPromise(): void {
if (this.workerStartResolve) {
this.workerStartResolve();
}
this.workerStartResolve = null;
this.workerStartReject = null;
this.workerStartPromise = null;
}
private rejectStartPromise(error: Error): void {
if (this.workerStartReject) {
this.workerStartReject(error);
}
this.workerStartResolve = null;
this.workerStartReject = null;
this.workerStartPromise = null;
}
private resetWorker(): void {
if (this.worker) {
this.worker.removeAllListeners();
this.worker.terminate();
}
this.worker = null;
this.workerReady = false;
this.workerStartPromise = null;
this.workerStartResolve = null;
this.workerStartReject = null;
}
private nextRequestId(): string {
this.requestCounter += 1;
return `blogmark-py-${this.requestCounter}`;
}
}
let blogmarkPythonWorkerRuntimeInstance: BlogmarkPythonWorkerRuntime | null = null;
export function getBlogmarkPythonWorkerRuntime(): BlogmarkPythonWorkerRuntime {
if (!blogmarkPythonWorkerRuntimeInstance) {
blogmarkPythonWorkerRuntimeInstance = new BlogmarkPythonWorkerRuntime();
}
return blogmarkPythonWorkerRuntimeInstance;
}

View File

@@ -1,5 +1,7 @@
import { z } from 'zod';
import { getScriptEngine } from './ScriptEngine';
import { getMetaEngine } from './MetaEngine';
import { getBlogmarkPythonWorkerRuntime } from './BlogmarkPythonWorkerRuntime';
const transformPostSchema = z.object({
title: z.string().trim().min(1),
@@ -55,6 +57,8 @@ export interface BlogmarkTransformResult {
toasts: string[];
}
export type PythonRuntimeMode = 'webworker' | 'main-thread';
const MAX_TOASTS_PER_SCRIPT = 5;
const MAX_TOASTS_TOTAL = 20;
const MAX_TOAST_LENGTH = 300;
@@ -142,6 +146,28 @@ function toErrorMessage(error: unknown): string {
return String(error);
}
function resolveTransformEntrypoint(value: string): string {
const nextEntrypoint = typeof value === 'string' ? value.trim() : '';
if (/^[A-Za-z_][A-Za-z0-9_]*$/.test(nextEntrypoint) && nextEntrypoint !== 'main') {
return nextEntrypoint;
}
return 'transform';
}
function resolvePythonRuntimeMode(value: unknown): PythonRuntimeMode {
if (value === 'main-thread') {
return 'main-thread';
}
return 'webworker';
}
async function getConfiguredPythonRuntimeMode(): Promise<PythonRuntimeMode> {
const metadata = await getMetaEngine().getProjectMetadata();
return resolvePythonRuntimeMode((metadata as { pythonRuntimeMode?: unknown } | null)?.pythonRuntimeMode);
}
class PythonBlogmarkTransformExecutor implements BlogmarkTransformExecutor {
private runtimePromise: Promise<any> | null = null;
@@ -169,7 +195,7 @@ def toast(message):
await runtime.runPythonAsync(script.content);
const requestedEntrypoint = this.resolveEntrypoint(script.entrypoint);
const requestedEntrypoint = resolveTransformEntrypoint(script.entrypoint);
const payload = JSON.stringify(input);
runtime.globals.set('__bds_transform_payload_json', payload);
runtime.globals.set('__bds_transform_entrypoint', requestedEntrypoint);
@@ -200,15 +226,6 @@ json.dumps(_result)
};
}
private resolveEntrypoint(value: string): string {
const nextEntrypoint = typeof value === 'string' ? value.trim() : '';
if (/^[A-Za-z_][A-Za-z0-9_]*$/.test(nextEntrypoint) && nextEntrypoint !== 'main') {
return nextEntrypoint;
}
return 'transform';
}
private async getRuntime(): Promise<any> {
if (!this.runtimePromise) {
this.runtimePromise = (async () => {
@@ -221,11 +238,26 @@ json.dumps(_result)
}
}
class PythonWorkerBlogmarkTransformExecutor implements BlogmarkTransformExecutor {
async runTransform(script: BlogmarkTransformScriptRecord, input: BlogmarkTransformInput): Promise<unknown> {
return getBlogmarkPythonWorkerRuntime().executeTransform({
scriptContent: script.content,
entrypoint: resolveTransformEntrypoint(script.entrypoint),
payloadJson: JSON.stringify(input),
});
}
}
const mainThreadExecutor = new PythonBlogmarkTransformExecutor();
const workerExecutor = new PythonWorkerBlogmarkTransformExecutor();
export class BlogmarkTransformService {
constructor(
private readonly dependencies: {
provider?: BlogmarkTransformScriptProvider;
executor?: BlogmarkTransformExecutor;
resolvePythonRuntimeMode?: () => Promise<PythonRuntimeMode>;
executors?: Partial<Record<PythonRuntimeMode, BlogmarkTransformExecutor>>;
} = {},
) {}
@@ -237,7 +269,7 @@ export class BlogmarkTransformService {
};
const provider = this.dependencies.provider ?? scriptEngineBackedProvider;
const executor = this.dependencies.executor ?? new PythonBlogmarkTransformExecutor();
const executor = this.dependencies.executor ?? await this.resolveExecutorForConfiguredRuntime();
const scripts = await provider.getScripts();
const activeTransforms = scripts
@@ -303,6 +335,18 @@ export class BlogmarkTransformService {
toasts,
};
}
private async resolveExecutorForConfiguredRuntime(): Promise<BlogmarkTransformExecutor> {
const resolveMode = this.dependencies.resolvePythonRuntimeMode ?? getConfiguredPythonRuntimeMode;
const mode = await resolveMode();
const executors = this.dependencies.executors ?? {};
if (mode === 'main-thread') {
return executors['main-thread'] ?? mainThreadExecutor;
}
return executors.webworker ?? workerExecutor;
}
}
let blogmarkTransformServiceInstance: BlogmarkTransformService | null = null;

View File

@@ -24,6 +24,7 @@ export interface ProjectMetadata {
defaultAuthor?: string; // Default author for new posts and media
maxPostsPerPage?: number; // Preview/server maximum posts per page (default 50)
blogmarkCategory?: string; // Category used for externally captured bookmark posts
pythonRuntimeMode?: 'webworker' | 'main-thread'; // Runtime mode for Python script execution
picoTheme?: PicoThemeName; // Selected Pico CSS theme for preview/rendering
categoryMetadata?: Record<string, CategoryMetadata>; // Per-category metadata for UI/rendering
categorySettings?: Record<string, CategoryRenderSettings>; // Per-category list rendering preferences
@@ -85,6 +86,7 @@ function normalizeProjectMetadata(metadata: ProjectMetadata): ProjectMetadata {
const blogmarkCategory = typeof metadata.blogmarkCategory === 'string'
? normalizeNonEmptyTaxonomyTerm(metadata.blogmarkCategory) ?? undefined
: undefined;
const pythonRuntimeMode = metadata.pythonRuntimeMode === 'main-thread' ? 'main-thread' : 'webworker';
const picoTheme = sanitizePicoTheme(metadata.picoTheme);
const categoryMetadata = normalizeCategoryMetadata(metadata.categoryMetadata ?? metadata.categorySettings);
return {
@@ -92,6 +94,7 @@ function normalizeProjectMetadata(metadata: ProjectMetadata): ProjectMetadata {
publicUrl,
maxPostsPerPage,
blogmarkCategory,
pythonRuntimeMode,
picoTheme,
categoryMetadata,
categorySettings: undefined,
@@ -306,6 +309,7 @@ export class MetaEngine extends EventEmitter {
defaultAuthor: normalizedUpdates.defaultAuthor,
maxPostsPerPage: normalizedUpdates.maxPostsPerPage,
blogmarkCategory: normalizedUpdates.blogmarkCategory,
pythonRuntimeMode: normalizedUpdates.pythonRuntimeMode,
picoTheme: normalizedUpdates.picoTheme,
categoryMetadata: normalizedUpdates.categoryMetadata,
});

View File

@@ -0,0 +1,150 @@
import { parentPort } from 'worker_threads';
interface WorkerRunTransformRequest {
type: 'runTransform';
requestId: string;
scriptContent: string;
entrypoint: string;
payloadJson: string;
}
interface WorkerReadyMessage {
type: 'ready';
}
interface WorkerResultMessage {
type: 'transformResult';
requestId: string;
output: unknown;
toasts: string[];
}
interface WorkerErrorMessage {
type: 'transformError';
requestId: string;
error: string;
}
interface WorkerFatalErrorMessage {
type: 'error';
error: string;
}
type WorkerResponseMessage = WorkerReadyMessage | WorkerResultMessage | WorkerErrorMessage | WorkerFatalErrorMessage;
type PyodideRuntime = {
globals: {
set: (name: string, value: unknown) => void;
} | any;
runPythonAsync: (code: string) => Promise<unknown>;
};
const MAX_TOASTS_PER_SCRIPT = 5;
const MAX_TOAST_LENGTH = 300;
let runtimePromise: Promise<PyodideRuntime> | null = null;
function postMessage(message: WorkerResponseMessage): void {
parentPort?.postMessage(message);
}
function normalizeToastMessage(value: unknown): string | null {
if (value === undefined || value === null) {
return null;
}
const normalized = String(value).trim();
if (normalized.length === 0) {
return null;
}
return normalized.slice(0, MAX_TOAST_LENGTH);
}
async function getRuntime(): Promise<PyodideRuntime> {
if (!runtimePromise) {
runtimePromise = (async () => {
const pyodideModule = await import('pyodide');
return (await pyodideModule.loadPyodide()) as unknown as PyodideRuntime;
})();
}
return runtimePromise;
}
async function runTransform(request: WorkerRunTransformRequest): Promise<void> {
try {
const runtime = await getRuntime();
const toastMessages: string[] = [];
const pushToast = (message: unknown): void => {
if (toastMessages.length >= MAX_TOASTS_PER_SCRIPT) {
return;
}
const normalized = normalizeToastMessage(message);
if (!normalized) {
return;
}
toastMessages.push(normalized);
};
runtime.globals.set('__bds_push_toast', pushToast);
await runtime.runPythonAsync(`
def toast(message):
__bds_push_toast(str(message))
`);
await runtime.runPythonAsync(request.scriptContent);
runtime.globals.set('__bds_transform_payload_json', request.payloadJson);
runtime.globals.set('__bds_transform_entrypoint', request.entrypoint);
const rawResult = await runtime.runPythonAsync(`
import json
_payload = json.loads(__bds_transform_payload_json)
_entrypoint = __bds_transform_entrypoint
_transform_fn = globals().get(_entrypoint)
if _transform_fn is None or not callable(_transform_fn):
raise RuntimeError(f"Transform entrypoint '{_entrypoint}' is not callable")
_post = _payload.get("post")
if not isinstance(_post, dict):
raise RuntimeError("Transform payload is missing a valid 'post' object")
_context = _payload.get("context")
try:
_result = _transform_fn(_post, _context)
except TypeError:
_result = _transform_fn(_post)
if _result is None:
_result = _post
json.dumps(_result)
`);
postMessage({
type: 'transformResult',
requestId: request.requestId,
output: JSON.parse(String(rawResult)),
toasts: toastMessages,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
postMessage({ type: 'transformError', requestId: request.requestId, error: message });
}
}
parentPort?.on('message', (message: WorkerRunTransformRequest) => {
if (message.type !== 'runTransform') {
return;
}
void runTransform(message);
});
void getRuntime()
.then(() => {
postMessage({ type: 'ready' });
})
.catch((error) => {
const message = error instanceof Error ? error.message : String(error);
postMessage({ type: 'error', error: message });
});

View File

@@ -1006,7 +1006,7 @@ export function registerIpcHandlers(): void {
return engine.getProjectMetadata();
});
safeHandle('meta:updateProjectMetadata', async (_, updates: { name?: string; description?: string; dataPath?: string; publicUrl?: string; mainLanguage?: string; defaultAuthor?: string; maxPostsPerPage?: number; picoTheme?: import('../shared/picoThemes').PicoThemeName; categoryMetadata?: Record<string, { renderInLists: boolean; showTitle: boolean; title: string }>; categorySettings?: Record<string, { renderInLists: boolean; showTitle: boolean }> }) => {
safeHandle('meta:updateProjectMetadata', async (_, updates: { name?: string; description?: string; dataPath?: string; publicUrl?: string; mainLanguage?: string; defaultAuthor?: string; maxPostsPerPage?: number; blogmarkCategory?: string; pythonRuntimeMode?: 'webworker' | 'main-thread'; picoTheme?: import('../shared/picoThemes').PicoThemeName; categoryMetadata?: Record<string, { renderInLists: boolean; showTitle: boolean; title: string }>; categorySettings?: Record<string, { renderInLists: boolean; showTitle: boolean }> }) => {
const engine = getMetaEngine();
await ensureMetaContext(engine);
await engine.updateProjectMetadata(updates);

View File

@@ -172,7 +172,7 @@ export const electronAPI: ElectronAPI = {
syncOnStartup: () => ipcRenderer.invoke('meta:syncOnStartup'),
getProjectMetadata: () => ipcRenderer.invoke('meta:getProjectMetadata'),
setProjectMetadata: (metadata: { name: string; description?: string }) => ipcRenderer.invoke('meta:setProjectMetadata', metadata),
updateProjectMetadata: (updates: { name?: string; description?: string; dataPath?: string; publicUrl?: string; mainLanguage?: string; defaultAuthor?: string; maxPostsPerPage?: number; blogmarkCategory?: string; picoTheme?: import('./shared/picoThemes').PicoThemeName; categoryMetadata?: Record<string, { renderInLists: boolean; showTitle: boolean; title: string }>; categorySettings?: Record<string, { renderInLists: boolean; showTitle: boolean }> }) => ipcRenderer.invoke('meta:updateProjectMetadata', updates),
updateProjectMetadata: (updates: { name?: string; description?: string; dataPath?: string; publicUrl?: string; mainLanguage?: string; defaultAuthor?: string; maxPostsPerPage?: number; blogmarkCategory?: string; pythonRuntimeMode?: 'webworker' | 'main-thread'; picoTheme?: import('./shared/picoThemes').PicoThemeName; categoryMetadata?: Record<string, { renderInLists: boolean; showTitle: boolean; title: string }>; categorySettings?: Record<string, { renderInLists: boolean; showTitle: boolean }> }) => ipcRenderer.invoke('meta:updateProjectMetadata', updates),
},
// Tag Management (advanced tag operations)

View File

@@ -43,6 +43,7 @@ export interface ProjectMetadata {
defaultAuthor?: string;
maxPostsPerPage?: number;
blogmarkCategory?: string;
pythonRuntimeMode?: 'webworker' | 'main-thread';
picoTheme?: import('./picoThemes').PicoThemeName;
categoryMetadata?: Record<string, CategoryMetadata>;
categorySettings?: Record<string, CategoryRenderSettings>;
@@ -619,7 +620,7 @@ export interface ElectronAPI {
syncOnStartup: () => Promise<{ tags: string[]; categories: string[]; projectMetadata: ProjectMetadata | null }>;
getProjectMetadata: () => Promise<ProjectMetadata | null>;
setProjectMetadata: (metadata: { name: string; description?: string }) => Promise<ProjectMetadata | null>;
updateProjectMetadata: (updates: { name?: string; description?: string; dataPath?: string; publicUrl?: string; mainLanguage?: string; defaultAuthor?: string; maxPostsPerPage?: number; blogmarkCategory?: string; picoTheme?: import('./picoThemes').PicoThemeName; categoryMetadata?: Record<string, CategoryMetadata>; categorySettings?: Record<string, CategoryRenderSettings> }) => Promise<ProjectMetadata | null>;
updateProjectMetadata: (updates: { name?: string; description?: string; dataPath?: string; publicUrl?: string; mainLanguage?: string; defaultAuthor?: string; maxPostsPerPage?: number; blogmarkCategory?: string; pythonRuntimeMode?: 'webworker' | 'main-thread'; picoTheme?: import('./picoThemes').PicoThemeName; categoryMetadata?: Record<string, CategoryMetadata>; categorySettings?: Record<string, CategoryRenderSettings> }) => Promise<ProjectMetadata | null>;
};
tags: {
getAll: () => Promise<TagData[]>;