Kubernetes -- Dynamic Service Account Tokens
Generates temporary Kubernetes service account tokens with RBAC bindings. Each generate call creates a dedicated ServiceAccount, binds it to a ClusterRole, and issues a short-lived token. Revocation deletes the ServiceAccount and RoleBinding, immediately invalidating the token.
Capabilities
| Capability | Description |
|---|---|
secret_generation | Creates ServiceAccount + RoleBinding + token |
secret_validation | Revokes credentials by deleting ServiceAccount and RoleBinding |
Configuration
| Parameter | Required | Description | Example |
|---|---|---|---|
kubeconfig_path | Optional | Path to kubeconfig file. If omitted, uses in-cluster config (KUBERNETES_SERVICE_HOST) | /home/user/.kube/config |
api_server | Optional | Override API server URL. Useful when kubeconfig is not available | https://k8s.example.com:6443 |
bearer_token | Optional | Bearer token for API server auth. Required if kubeconfig_path is omitted and running outside the cluster | eyJhbGciOi... |
ca_cert | Optional | PEM-encoded CA cert for TLS verification. Set to "" to skip verification (dev only) | -----BEGIN CERTIFICATE-----... |
namespace | Optional | Default namespace for created resources. Defaults to default | arcan-dynamic |
token_ttl | Optional | Default token expiration. Defaults to 1h | 4h |
When running inside a Kubernetes pod, the engine auto-detects the service account token at /var/run/secrets/kubernetes.io/serviceaccount/token and the CA cert at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt.
Roles
| Role | ClusterRole Binding | Use Case |
|---|---|---|
viewer | view | Read-only access to most resources in the namespace |
editor | edit | Read/write access to most resources (no RBAC modification) |
admin | admin | Full namespace admin including RBAC |
Operations
generate
Creates a ServiceAccount, binds it to a ClusterRole, and issues a short-lived token.
Request:
{
"method": "generate",
"params": {
"namespace": "production",
"role": "viewer",
"ttl": "2h"
}
}
What happens internally:
- Generates a unique name:
arcan-<random-8-chars> - Creates a ServiceAccount via
POST /api/v1/namespaces/{ns}/serviceaccounts - Creates a RoleBinding via
POST /apis/rbac.authorization.k8s.io/v1/namespaces/{ns}/rolebindingsbinding the ServiceAccount to the requested ClusterRole - Requests a token via
POST /api/v1/namespaces/{ns}/serviceaccounts/{name}/tokenwith the requested TTL - Returns the token and metadata
Response:
{
"data": {
"token": "eyJhbGciOiJSUzI1NiIs...",
"namespace": "production",
"service_account": "arcan-a1b2c3d4",
"role_binding": "arcan-a1b2c3d4-viewer",
"cluster_role": "view",
"expires_at": "2026-04-02T12:00:00Z"
}
}
validate (revoke)
Deletes the ServiceAccount and RoleBinding, immediately revoking access.
Request:
{
"method": "validate",
"params": {
"service_account": "arcan-a1b2c3d4",
"namespace": "production"
}
}
What happens internally:
- Deletes the RoleBinding via
DELETE /apis/rbac.authorization.k8s.io/v1/namespaces/{ns}/rolebindings/{name}-{role} - Deletes the ServiceAccount via
DELETE /api/v1/namespaces/{ns}/serviceaccounts/{name} - The token becomes invalid immediately (Kubernetes invalidates tokens when the SA is deleted)
Response:
{
"data": {
"valid": false,
"message": "service account arcan-a1b2c3d4 and bindings deleted"
}
}
Complete Go Plugin Source
package main
import (
"bytes"
"crypto/rand"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"
"getarcan.dev/arcan/sdk"
)
type KubeEngine struct {
apiServer string
token string
httpClient *http.Client
namespace string
tokenTTL time.Duration
}
type generateParams struct {
Namespace string `json:"namespace"`
Role string `json:"role"`
TTL string `json:"ttl"`
}
type revokeParams struct {
ServiceAccount string `json:"service_account"`
Namespace string `json:"namespace"`
}
var clusterRoles = map[string]string{
"viewer": "view",
"editor": "edit",
"admin": "admin",
}
func (e *KubeEngine) Describe() sdk.Descriptor {
return sdk.Descriptor{
Name: "kubernetes",
Version: "0.1.0",
DisplayName: "Kubernetes",
Description: "Dynamic service account tokens with RBAC bindings",
Capabilities: []string{"secret_generation", "secret_validation"},
}
}
func (e *KubeEngine) Ping() error {
_, err := e.kubeAPI("GET", "/api/v1/namespaces/default", nil)
return err
}
func (e *KubeEngine) Generate(params json.RawMessage) (*sdk.SecretResult, error) {
var p generateParams
if err := json.Unmarshal(params, &p); err != nil {
return nil, fmt.Errorf("invalid params: %w", err)
}
ns := p.Namespace
if ns == "" {
ns = e.namespace
}
clusterRole, ok := clusterRoles[p.Role]
if !ok {
return nil, fmt.Errorf("unknown role %q -- valid roles: viewer, editor, admin", p.Role)
}
ttl := e.tokenTTL
if p.TTL != "" {
var err error
ttl, err = time.ParseDuration(p.TTL)
if err != nil {
return nil, fmt.Errorf("invalid ttl: %w", err)
}
}
name := "arcan-" + randomHex(4)
// 1. Create ServiceAccount
sa := map[string]any{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": map[string]any{"name": name, "namespace": ns},
}
if _, err := e.kubeAPI("POST", fmt.Sprintf("/api/v1/namespaces/%s/serviceaccounts", ns), sa); err != nil {
return nil, fmt.Errorf("creating service account: %w", err)
}
// 2. Create RoleBinding
rbName := fmt.Sprintf("%s-%s", name, p.Role)
rb := map[string]any{
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "RoleBinding",
"metadata": map[string]any{"name": rbName, "namespace": ns},
"roleRef": map[string]any{
"apiGroup": "rbac.authorization.k8s.io",
"kind": "ClusterRole",
"name": clusterRole,
},
"subjects": []map[string]any{{
"kind": "ServiceAccount",
"name": name,
"namespace": ns,
}},
}
if _, err := e.kubeAPI("POST", fmt.Sprintf("/apis/rbac.authorization.k8s.io/v1/namespaces/%s/rolebindings", ns), rb); err != nil {
return nil, fmt.Errorf("creating role binding: %w", err)
}
// 3. Request token
ttlSeconds := int64(ttl.Seconds())
tokenReq := map[string]any{
"apiVersion": "authentication.k8s.io/v1",
"kind": "TokenRequest",
"spec": map[string]any{"expirationSeconds": ttlSeconds},
}
tokenResp, err := e.kubeAPI("POST", fmt.Sprintf("/api/v1/namespaces/%s/serviceaccounts/%s/token", ns, name), tokenReq)
if err != nil {
return nil, fmt.Errorf("requesting token: %w", err)
}
var tokenResult struct {
Status struct {
Token string `json:"token"`
ExpirationTimestamp string `json:"expirationTimestamp"`
} `json:"status"`
}
if err := json.Unmarshal(tokenResp, &tokenResult); err != nil {
return nil, fmt.Errorf("parsing token response: %w", err)
}
return &sdk.SecretResult{
Data: map[string]any{
"token": tokenResult.Status.Token,
"namespace": ns,
"service_account": name,
"role_binding": rbName,
"cluster_role": clusterRole,
"expires_at": tokenResult.Status.ExpirationTimestamp,
},
}, nil
}
func (e *KubeEngine) Validate(params json.RawMessage) (*sdk.ValidationResult, error) {
var p revokeParams
if err := json.Unmarshal(params, &p); err != nil {
return nil, fmt.Errorf("invalid params: %w", err)
}
ns := p.Namespace
if ns == "" {
ns = e.namespace
}
// Delete RoleBinding (best-effort -- may have multiple roles)
rbPrefix := p.ServiceAccount + "-"
for role := range clusterRoles {
rbName := rbPrefix + role
e.kubeAPI("DELETE", fmt.Sprintf("/apis/rbac.authorization.k8s.io/v1/namespaces/%s/rolebindings/%s", ns, rbName), nil)
}
// Delete ServiceAccount
if _, err := e.kubeAPI("DELETE", fmt.Sprintf("/api/v1/namespaces/%s/serviceaccounts/%s", ns, p.ServiceAccount), nil); err != nil {
return nil, fmt.Errorf("deleting service account: %w", err)
}
return &sdk.ValidationResult{
Valid: false,
Message: fmt.Sprintf("service account %s and bindings deleted", p.ServiceAccount),
}, nil
}
func (e *KubeEngine) kubeAPI(method, path string, body any) (json.RawMessage, error) {
var bodyReader io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, err
}
bodyReader = bytes.NewReader(data)
}
req, err := http.NewRequest(method, e.apiServer+path, bodyReader)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+e.token)
req.Header.Set("Content-Type", "application/json")
resp, err := e.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("kubernetes API %s %s returned %d: %s", method, path, resp.StatusCode, string(respBody))
}
return respBody, nil
}
func randomHex(n int) string {
b := make([]byte, n)
rand.Read(b)
return hex.EncodeToString(b)
}
func main() {
apiServer := os.Getenv("ARCAN_KUBE_API_SERVER")
token := os.Getenv("ARCAN_KUBE_TOKEN")
namespace := os.Getenv("ARCAN_KUBE_NAMESPACE")
// Auto-detect in-cluster config
if apiServer == "" {
host := os.Getenv("KUBERNETES_SERVICE_HOST")
port := os.Getenv("KUBERNETES_SERVICE_PORT")
if host != "" && port != "" {
apiServer = fmt.Sprintf("https://%s:%s", host, port)
}
}
if token == "" {
if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token"); err == nil {
token = string(data)
}
}
if namespace == "" {
if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
namespace = string(data)
} else {
namespace = "default"
}
}
if apiServer == "" || token == "" {
fmt.Fprintf(os.Stderr, "set ARCAN_KUBE_API_SERVER and ARCAN_KUBE_TOKEN, or run inside a Kubernetes pod\n")
os.Exit(1)
}
eng := &KubeEngine{
apiServer: apiServer,
token: token,
namespace: namespace,
tokenTTL: 1 * time.Hour,
httpClient: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: os.Getenv("ARCAN_KUBE_SKIP_TLS") == "true",
},
},
},
}
sdk.Serve(eng)
}
Build & Install
mkdir -p engines/kubernetes && cd engines/kubernetes
go mod init getarcan.dev/engines/kubernetes
go get getarcan.dev/arcan/sdk
# Build
go build -o arcan-engine-kubernetes .
# Install
cp arcan-engine-kubernetes ~/.arcan/plugins/
Usage
# Set connection (outside cluster)
export ARCAN_KUBE_API_SERVER="https://k8s.example.com:6443"
export ARCAN_KUBE_TOKEN="eyJhbGciOi..."
# Ping
echo '{"method":"ping","params":{}}' | ./arcan-engine-kubernetes
# => {"data":{"status":"healthy"}}
# Generate viewer credentials
echo '{"method":"generate","params":{"namespace":"production","role":"viewer","ttl":"2h"}}' | ./arcan-engine-kubernetes
# => {"data":{"token":"eyJ...","service_account":"arcan-a1b2c3d4","namespace":"production",...}}
# Use the token
kubectl --token="$TOKEN" --server="$API_SERVER" get pods -n production
# Revoke (delete SA + bindings)
echo '{"method":"validate","params":{"service_account":"arcan-a1b2c3d4","namespace":"production"}}' | ./arcan-engine-kubernetes
# => {"data":{"valid":false,"message":"service account arcan-a1b2c3d4 and bindings deleted"}}
Security Notes
- The engine's own bearer token needs permissions to create/delete ServiceAccounts, RoleBindings, and TokenRequests in target namespaces. A dedicated ClusterRole with these exact permissions is recommended.
- Token TTLs are enforced by the Kubernetes API server. The engine cannot issue tokens longer than the cluster's
--service-account-max-token-expirationsetting. - Deleting a ServiceAccount immediately invalidates all tokens issued for it.
- The engine uses the Kubernetes REST API directly (no
client-godependency) to keep the binary small and dependency-free. - Set
ARCAN_KUBE_SKIP_TLS=trueonly in development. Production must verify the API server's TLS certificate. - All created resources are prefixed with
arcan-for easy identification and cleanup. - Consider creating a dedicated namespace (e.g.,
arcan-dynamic) to isolate dynamically created resources.