Skip to main content

Kubernetes -- Dynamic Service Account Tokens

Generates temporary Kubernetes service account tokens with RBAC bindings. Each generate call creates a dedicated ServiceAccount, binds it to a ClusterRole, and issues a short-lived token. Revocation deletes the ServiceAccount and RoleBinding, immediately invalidating the token.

Capabilities

CapabilityDescription
secret_generationCreates ServiceAccount + RoleBinding + token
secret_validationRevokes credentials by deleting ServiceAccount and RoleBinding

Configuration

ParameterRequiredDescriptionExample
kubeconfig_pathOptionalPath to kubeconfig file. If omitted, uses in-cluster config (KUBERNETES_SERVICE_HOST)/home/user/.kube/config
api_serverOptionalOverride API server URL. Useful when kubeconfig is not availablehttps://k8s.example.com:6443
bearer_tokenOptionalBearer token for API server auth. Required if kubeconfig_path is omitted and running outside the clustereyJhbGciOi...
ca_certOptionalPEM-encoded CA cert for TLS verification. Set to "" to skip verification (dev only)-----BEGIN CERTIFICATE-----...
namespaceOptionalDefault namespace for created resources. Defaults to defaultarcan-dynamic
token_ttlOptionalDefault token expiration. Defaults to 1h4h

When running inside a Kubernetes pod, the engine auto-detects the service account token at /var/run/secrets/kubernetes.io/serviceaccount/token and the CA cert at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt.

Roles

RoleClusterRole BindingUse Case
viewerviewRead-only access to most resources in the namespace
editoreditRead/write access to most resources (no RBAC modification)
adminadminFull namespace admin including RBAC

Operations

generate

Creates a ServiceAccount, binds it to a ClusterRole, and issues a short-lived token.

Request:

{
"method": "generate",
"params": {
"namespace": "production",
"role": "viewer",
"ttl": "2h"
}
}

What happens internally:

  1. Generates a unique name: arcan-<random-8-chars>
  2. Creates a ServiceAccount via POST /api/v1/namespaces/{ns}/serviceaccounts
  3. Creates a RoleBinding via POST /apis/rbac.authorization.k8s.io/v1/namespaces/{ns}/rolebindings binding the ServiceAccount to the requested ClusterRole
  4. Requests a token via POST /api/v1/namespaces/{ns}/serviceaccounts/{name}/token with the requested TTL
  5. Returns the token and metadata

Response:

{
"data": {
"token": "eyJhbGciOiJSUzI1NiIs...",
"namespace": "production",
"service_account": "arcan-a1b2c3d4",
"role_binding": "arcan-a1b2c3d4-viewer",
"cluster_role": "view",
"expires_at": "2026-04-02T12:00:00Z"
}
}

validate (revoke)

Deletes the ServiceAccount and RoleBinding, immediately revoking access.

Request:

{
"method": "validate",
"params": {
"service_account": "arcan-a1b2c3d4",
"namespace": "production"
}
}

What happens internally:

  1. Deletes the RoleBinding via DELETE /apis/rbac.authorization.k8s.io/v1/namespaces/{ns}/rolebindings/{name}-{role}
  2. Deletes the ServiceAccount via DELETE /api/v1/namespaces/{ns}/serviceaccounts/{name}
  3. The token becomes invalid immediately (Kubernetes invalidates tokens when the SA is deleted)

Response:

{
"data": {
"valid": false,
"message": "service account arcan-a1b2c3d4 and bindings deleted"
}
}

Complete Go Plugin Source

package main

import (
"bytes"
"crypto/rand"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"

"getarcan.dev/arcan/sdk"
)

type KubeEngine struct {
apiServer string
token string
httpClient *http.Client
namespace string
tokenTTL time.Duration
}

type generateParams struct {
Namespace string `json:"namespace"`
Role string `json:"role"`
TTL string `json:"ttl"`
}

type revokeParams struct {
ServiceAccount string `json:"service_account"`
Namespace string `json:"namespace"`
}

var clusterRoles = map[string]string{
"viewer": "view",
"editor": "edit",
"admin": "admin",
}

func (e *KubeEngine) Describe() sdk.Descriptor {
return sdk.Descriptor{
Name: "kubernetes",
Version: "0.1.0",
DisplayName: "Kubernetes",
Description: "Dynamic service account tokens with RBAC bindings",
Capabilities: []string{"secret_generation", "secret_validation"},
}
}

func (e *KubeEngine) Ping() error {
_, err := e.kubeAPI("GET", "/api/v1/namespaces/default", nil)
return err
}

func (e *KubeEngine) Generate(params json.RawMessage) (*sdk.SecretResult, error) {
var p generateParams
if err := json.Unmarshal(params, &p); err != nil {
return nil, fmt.Errorf("invalid params: %w", err)
}

ns := p.Namespace
if ns == "" {
ns = e.namespace
}

clusterRole, ok := clusterRoles[p.Role]
if !ok {
return nil, fmt.Errorf("unknown role %q -- valid roles: viewer, editor, admin", p.Role)
}

ttl := e.tokenTTL
if p.TTL != "" {
var err error
ttl, err = time.ParseDuration(p.TTL)
if err != nil {
return nil, fmt.Errorf("invalid ttl: %w", err)
}
}

name := "arcan-" + randomHex(4)

// 1. Create ServiceAccount
sa := map[string]any{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": map[string]any{"name": name, "namespace": ns},
}
if _, err := e.kubeAPI("POST", fmt.Sprintf("/api/v1/namespaces/%s/serviceaccounts", ns), sa); err != nil {
return nil, fmt.Errorf("creating service account: %w", err)
}

// 2. Create RoleBinding
rbName := fmt.Sprintf("%s-%s", name, p.Role)
rb := map[string]any{
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "RoleBinding",
"metadata": map[string]any{"name": rbName, "namespace": ns},
"roleRef": map[string]any{
"apiGroup": "rbac.authorization.k8s.io",
"kind": "ClusterRole",
"name": clusterRole,
},
"subjects": []map[string]any{{
"kind": "ServiceAccount",
"name": name,
"namespace": ns,
}},
}
if _, err := e.kubeAPI("POST", fmt.Sprintf("/apis/rbac.authorization.k8s.io/v1/namespaces/%s/rolebindings", ns), rb); err != nil {
return nil, fmt.Errorf("creating role binding: %w", err)
}

// 3. Request token
ttlSeconds := int64(ttl.Seconds())
tokenReq := map[string]any{
"apiVersion": "authentication.k8s.io/v1",
"kind": "TokenRequest",
"spec": map[string]any{"expirationSeconds": ttlSeconds},
}
tokenResp, err := e.kubeAPI("POST", fmt.Sprintf("/api/v1/namespaces/%s/serviceaccounts/%s/token", ns, name), tokenReq)
if err != nil {
return nil, fmt.Errorf("requesting token: %w", err)
}

var tokenResult struct {
Status struct {
Token string `json:"token"`
ExpirationTimestamp string `json:"expirationTimestamp"`
} `json:"status"`
}
if err := json.Unmarshal(tokenResp, &tokenResult); err != nil {
return nil, fmt.Errorf("parsing token response: %w", err)
}

return &sdk.SecretResult{
Data: map[string]any{
"token": tokenResult.Status.Token,
"namespace": ns,
"service_account": name,
"role_binding": rbName,
"cluster_role": clusterRole,
"expires_at": tokenResult.Status.ExpirationTimestamp,
},
}, nil
}

func (e *KubeEngine) Validate(params json.RawMessage) (*sdk.ValidationResult, error) {
var p revokeParams
if err := json.Unmarshal(params, &p); err != nil {
return nil, fmt.Errorf("invalid params: %w", err)
}

ns := p.Namespace
if ns == "" {
ns = e.namespace
}

// Delete RoleBinding (best-effort -- may have multiple roles)
rbPrefix := p.ServiceAccount + "-"
for role := range clusterRoles {
rbName := rbPrefix + role
e.kubeAPI("DELETE", fmt.Sprintf("/apis/rbac.authorization.k8s.io/v1/namespaces/%s/rolebindings/%s", ns, rbName), nil)
}

// Delete ServiceAccount
if _, err := e.kubeAPI("DELETE", fmt.Sprintf("/api/v1/namespaces/%s/serviceaccounts/%s", ns, p.ServiceAccount), nil); err != nil {
return nil, fmt.Errorf("deleting service account: %w", err)
}

return &sdk.ValidationResult{
Valid: false,
Message: fmt.Sprintf("service account %s and bindings deleted", p.ServiceAccount),
}, nil
}

func (e *KubeEngine) kubeAPI(method, path string, body any) (json.RawMessage, error) {
var bodyReader io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, err
}
bodyReader = bytes.NewReader(data)
}

req, err := http.NewRequest(method, e.apiServer+path, bodyReader)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+e.token)
req.Header.Set("Content-Type", "application/json")

resp, err := e.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode >= 400 {
return nil, fmt.Errorf("kubernetes API %s %s returned %d: %s", method, path, resp.StatusCode, string(respBody))
}

return respBody, nil
}

func randomHex(n int) string {
b := make([]byte, n)
rand.Read(b)
return hex.EncodeToString(b)
}

func main() {
apiServer := os.Getenv("ARCAN_KUBE_API_SERVER")
token := os.Getenv("ARCAN_KUBE_TOKEN")
namespace := os.Getenv("ARCAN_KUBE_NAMESPACE")

// Auto-detect in-cluster config
if apiServer == "" {
host := os.Getenv("KUBERNETES_SERVICE_HOST")
port := os.Getenv("KUBERNETES_SERVICE_PORT")
if host != "" && port != "" {
apiServer = fmt.Sprintf("https://%s:%s", host, port)
}
}
if token == "" {
if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token"); err == nil {
token = string(data)
}
}
if namespace == "" {
if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
namespace = string(data)
} else {
namespace = "default"
}
}

if apiServer == "" || token == "" {
fmt.Fprintf(os.Stderr, "set ARCAN_KUBE_API_SERVER and ARCAN_KUBE_TOKEN, or run inside a Kubernetes pod\n")
os.Exit(1)
}

eng := &KubeEngine{
apiServer: apiServer,
token: token,
namespace: namespace,
tokenTTL: 1 * time.Hour,
httpClient: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: os.Getenv("ARCAN_KUBE_SKIP_TLS") == "true",
},
},
},
}
sdk.Serve(eng)
}

Build & Install

mkdir -p engines/kubernetes && cd engines/kubernetes
go mod init getarcan.dev/engines/kubernetes
go get getarcan.dev/arcan/sdk

# Build
go build -o arcan-engine-kubernetes .

# Install
cp arcan-engine-kubernetes ~/.arcan/plugins/

Usage

# Set connection (outside cluster)
export ARCAN_KUBE_API_SERVER="https://k8s.example.com:6443"
export ARCAN_KUBE_TOKEN="eyJhbGciOi..."

# Ping
echo '{"method":"ping","params":{}}' | ./arcan-engine-kubernetes
# => {"data":{"status":"healthy"}}

# Generate viewer credentials
echo '{"method":"generate","params":{"namespace":"production","role":"viewer","ttl":"2h"}}' | ./arcan-engine-kubernetes
# => {"data":{"token":"eyJ...","service_account":"arcan-a1b2c3d4","namespace":"production",...}}

# Use the token
kubectl --token="$TOKEN" --server="$API_SERVER" get pods -n production

# Revoke (delete SA + bindings)
echo '{"method":"validate","params":{"service_account":"arcan-a1b2c3d4","namespace":"production"}}' | ./arcan-engine-kubernetes
# => {"data":{"valid":false,"message":"service account arcan-a1b2c3d4 and bindings deleted"}}

Security Notes

  • The engine's own bearer token needs permissions to create/delete ServiceAccounts, RoleBindings, and TokenRequests in target namespaces. A dedicated ClusterRole with these exact permissions is recommended.
  • Token TTLs are enforced by the Kubernetes API server. The engine cannot issue tokens longer than the cluster's --service-account-max-token-expiration setting.
  • Deleting a ServiceAccount immediately invalidates all tokens issued for it.
  • The engine uses the Kubernetes REST API directly (no client-go dependency) to keep the binary small and dependency-free.
  • Set ARCAN_KUBE_SKIP_TLS=true only in development. Production must verify the API server's TLS certificate.
  • All created resources are prefixed with arcan- for easy identification and cleanup.
  • Consider creating a dedicated namespace (e.g., arcan-dynamic) to isolate dynamically created resources.