implement sse
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
import {
|
||||
API_HA_DeskPosition,
|
||||
BaseService,
|
||||
type HomeAssistantDeskPositionResult,
|
||||
type HomeAssistantEntity,
|
||||
type ServiceResult,
|
||||
type API_HA_DeskPosition,
|
||||
BaseService,
|
||||
type HomeAssistantDeskPositionResult,
|
||||
type HomeAssistantEntity,
|
||||
type ServiceResult,
|
||||
} from "@dpu/shared";
|
||||
import { logWarning } from "@dpu/shared/dist/logger.js";
|
||||
import { calculateSecondsBetween } from "@dpu/shared/dist/timehelper.js";
|
||||
@@ -11,105 +11,107 @@ import { Config } from "../config.js";
|
||||
import type { HomeAssistantClient } from "./client.js";
|
||||
|
||||
export class HomeAssistantService extends BaseService<HomeAssistantClient> {
|
||||
async startStandingAutomation(): Promise<ServiceResult<unknown | string>> {
|
||||
try {
|
||||
const positionResult = await this.getDeskPosition();
|
||||
async startStandingAutomation(): Promise<ServiceResult<unknown | string>> {
|
||||
try {
|
||||
const positionResult = await this.getDeskPosition();
|
||||
|
||||
if (!positionResult.successful) {
|
||||
throw Error(positionResult.result as string);
|
||||
}
|
||||
if (!positionResult.successful) {
|
||||
throw Error(positionResult.result as string);
|
||||
}
|
||||
|
||||
const position = positionResult.result as HomeAssistantDeskPositionResult;
|
||||
const position = positionResult.result as HomeAssistantDeskPositionResult;
|
||||
|
||||
if (position.as_boolean) {
|
||||
throw Error(
|
||||
`desk is already in standing position and has been for ${position.last_changed.toReadable(true)}`,
|
||||
);
|
||||
}
|
||||
if (position.as_boolean) {
|
||||
throw Error(
|
||||
`desk is already in standing position and has been for ${position.last_changed.toReadable(true)}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (position.last_changed.seconds < 300) {
|
||||
throw Error("desk has moved too recently");
|
||||
}
|
||||
if (position.last_changed.seconds < 300) {
|
||||
throw Error("desk has moved too recently");
|
||||
}
|
||||
|
||||
const result = await this.getClient().triggerWebhook(
|
||||
Config.homeassistant.id_webhook_stand,
|
||||
);
|
||||
const result = await this.getClient().triggerWebhook(
|
||||
Config.homeassistant.id_webhook_stand,
|
||||
);
|
||||
|
||||
return this.getSuccessfulResult(result);
|
||||
} catch (error) {
|
||||
const error_message = `error starting stand automation. ${error instanceof Error ? error.message : error}`;
|
||||
logWarning(error_message);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
return this.getSuccessfulResult(result);
|
||||
} catch (error) {
|
||||
const error_message = `error starting stand automation. ${error instanceof Error ? error.message : error}`;
|
||||
logWarning(error_message);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
|
||||
async getDeskPosition(): Promise<
|
||||
ServiceResult<HomeAssistantDeskPositionResult | string>
|
||||
> {
|
||||
try {
|
||||
const raw = await this.getClient().getEntityState(
|
||||
Config.homeassistant.id_desk_sensor_binary,
|
||||
);
|
||||
async getDeskPosition(): Promise<
|
||||
ServiceResult<HomeAssistantDeskPositionResult | string>
|
||||
> {
|
||||
try {
|
||||
const raw = await this.getClient().getEntityState(
|
||||
Config.homeassistant.id_desk_sensor_binary,
|
||||
);
|
||||
|
||||
const position = Number(raw.state);
|
||||
const position = Number(raw.state);
|
||||
|
||||
const result = {
|
||||
raw,
|
||||
as_boolean: position === 1,
|
||||
as_text: () => {
|
||||
if (position === 1) return "standing";
|
||||
if (position === 0) return "sitting";
|
||||
return "unknown";
|
||||
},
|
||||
last_changed: calculateSecondsBetween(
|
||||
new Date(raw.last_changed).getTime(),
|
||||
Date.now(),
|
||||
),
|
||||
};
|
||||
const result = {
|
||||
raw,
|
||||
as_boolean: position === 1,
|
||||
as_text: () => {
|
||||
if (position === 1) return "standing";
|
||||
if (position === 0) return "sitting";
|
||||
return "unknown";
|
||||
},
|
||||
last_changed: calculateSecondsBetween(
|
||||
new Date(raw.last_changed).getTime(),
|
||||
Date.now(),
|
||||
),
|
||||
};
|
||||
|
||||
return this.getSuccessfulResult(result);
|
||||
} catch (error) {
|
||||
const error_message = "error getting desk position";
|
||||
logWarning(error_message, error);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
return this.getSuccessfulResult(result);
|
||||
} catch (error) {
|
||||
const error_message = "error getting desk position";
|
||||
logWarning(error_message, error);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
|
||||
convertPosResultToApiAnswer(position: HomeAssistantDeskPositionResult): API_HA_DeskPosition {
|
||||
return {
|
||||
position: position.as_text(),
|
||||
is_standing: position.as_boolean,
|
||||
last_changed: position.last_changed.toReadable(true),
|
||||
}
|
||||
}
|
||||
convertPosResultToApiAnswer(
|
||||
position: HomeAssistantDeskPositionResult,
|
||||
): API_HA_DeskPosition {
|
||||
return {
|
||||
position: position.as_text(),
|
||||
is_standing: position.as_boolean,
|
||||
last_changed: position.last_changed.toReadable(true),
|
||||
};
|
||||
}
|
||||
|
||||
async getTemperatureText(): Promise<ServiceResult<string>> {
|
||||
try {
|
||||
const entities = await this.getTemperatures();
|
||||
const values = entities
|
||||
.map((entity) => parseFloat(entity.state))
|
||||
.filter((value) => !Number.isNaN(value));
|
||||
const average =
|
||||
values.length > 0
|
||||
? values.reduce((sum, value) => sum + value, 0) / values.length
|
||||
: 0;
|
||||
const result = average.toFixed(2);
|
||||
return this.getSuccessfulResult(result);
|
||||
} catch (error) {
|
||||
const error_message = "error getting temperature as text";
|
||||
logWarning(error_message, error);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
async getTemperatureText(): Promise<ServiceResult<string>> {
|
||||
try {
|
||||
const entities = await this.getTemperatures();
|
||||
const values = entities
|
||||
.map((entity) => parseFloat(entity.state))
|
||||
.filter((value) => !Number.isNaN(value));
|
||||
const average =
|
||||
values.length > 0
|
||||
? values.reduce((sum, value) => sum + value, 0) / values.length
|
||||
: 0;
|
||||
const result = average.toFixed(2);
|
||||
return this.getSuccessfulResult(result);
|
||||
} catch (error) {
|
||||
const error_message = "error getting temperature as text";
|
||||
logWarning(error_message, error);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
|
||||
private async getTemperatures(): Promise<HomeAssistantEntity[]> {
|
||||
try {
|
||||
return await this.getClient().getEntityStates(
|
||||
Config.homeassistant.id_room_sensors,
|
||||
);
|
||||
} catch (error) {
|
||||
logWarning("error getting temperatures:", error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
private async getTemperatures(): Promise<HomeAssistantEntity[]> {
|
||||
try {
|
||||
return await this.getClient().getEntityStates(
|
||||
Config.homeassistant.id_room_sensors,
|
||||
);
|
||||
} catch (error) {
|
||||
logWarning("error getting temperatures:", error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,21 +1,14 @@
|
||||
import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
|
||||
import type { FastifyInstance } from "fastify";
|
||||
import { z } from "zod";
|
||||
import type { TidalService } from "../tidal/service.js";
|
||||
import { HomeAssistantService } from "../homeassistant/service.js";
|
||||
import { HomepageService } from "./service.js";
|
||||
import { API_HA_DeskPosition, TidalGetCurrent } from "@dpu/shared";
|
||||
import { type HomepageService } from "./service.js";
|
||||
import { type API_HA_DeskPosition, type TidalGetCurrent } from "@dpu/shared";
|
||||
|
||||
export async function homepageRoutes(
|
||||
fastify: FastifyInstance,
|
||||
{
|
||||
hpService,
|
||||
verifyAPIKey,
|
||||
}: {
|
||||
hpService: HomepageService
|
||||
verifyAPIKey: (
|
||||
request: FastifyRequest,
|
||||
reply: FastifyReply,
|
||||
) => Promise<void>;
|
||||
hpService: HomepageService;
|
||||
},
|
||||
) {
|
||||
fastify.get(
|
||||
|
||||
@@ -2,6 +2,7 @@ import {
|
||||
BaseService,
|
||||
FullInformation,
|
||||
HomeAssistantDeskPositionResult,
|
||||
SseClientChangeEvent,
|
||||
SseService,
|
||||
TidalGetCurrent,
|
||||
type ServiceResult,
|
||||
@@ -14,33 +15,135 @@ export class HomepageService extends BaseService<null> {
|
||||
private haService: HomeAssistantService;
|
||||
private tidalService: TidalService;
|
||||
private sseService: SseService;
|
||||
private pollingInterval: ReturnType<typeof setInterval> | null = null;
|
||||
private songEndTimeout: ReturnType<typeof setTimeout> | null = null;
|
||||
private lastPoll: FullInformation | null = null;
|
||||
|
||||
constructor(haService: HomeAssistantService, tidalService: TidalService, sseService: SseService) {
|
||||
constructor(
|
||||
haService: HomeAssistantService,
|
||||
tidalService: TidalService,
|
||||
sseService: SseService,
|
||||
) {
|
||||
super(null);
|
||||
this.haService = haService;
|
||||
this.tidalService = tidalService;
|
||||
this.sseService = sseService;
|
||||
this.listenForClientChange();
|
||||
}
|
||||
|
||||
async getFullInformation(): Promise<ServiceResult<FullInformation | string>> {
|
||||
try {
|
||||
const [desk, temp, song] = await Promise.all([
|
||||
this.haService.getDeskPosition(),
|
||||
this.haService.getTemperatureText(),
|
||||
this.tidalService.getSong()
|
||||
this.haService.getDeskPosition(),
|
||||
this.haService.getTemperatureText(),
|
||||
this.tidalService.getSong(),
|
||||
]);
|
||||
|
||||
const result = {
|
||||
ha_desk_position: desk.successful ? this.haService.convertPosResultToApiAnswer(desk.result as HomeAssistantDeskPositionResult) : null,
|
||||
ha_desk_position: desk.successful
|
||||
? this.haService.convertPosResultToApiAnswer(
|
||||
desk.result as HomeAssistantDeskPositionResult,
|
||||
)
|
||||
: null,
|
||||
ha_temp: temp.successful ? temp.result : null,
|
||||
tidal_current: song ? song.result as TidalGetCurrent : null,
|
||||
}
|
||||
tidal_current: song ? (song.result as TidalGetCurrent) : null,
|
||||
};
|
||||
|
||||
return this.getSuccessfulResult(result);
|
||||
return this.getSuccessfulResult(this.updateLastPoll(result));
|
||||
} catch {
|
||||
const error_message = "error getting all information";
|
||||
logWarning(error_message);
|
||||
return this.getErrorResult(error_message);
|
||||
}
|
||||
}
|
||||
|
||||
updateLastPoll(newPoll: FullInformation) {
|
||||
const updates = [];
|
||||
if (
|
||||
this.lastPoll?.ha_desk_position?.is_standing !==
|
||||
newPoll.ha_desk_position?.is_standing
|
||||
) {
|
||||
updates.push({
|
||||
component: "ha_desk_position",
|
||||
data: newPoll.ha_desk_position,
|
||||
});
|
||||
}
|
||||
|
||||
if (this.lastPoll?.ha_temp !== newPoll.ha_temp) {
|
||||
updates.push({
|
||||
component: "ha_temp",
|
||||
data: newPoll.ha_temp,
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
this.lastPoll?.tidal_current?.title !== newPoll.tidal_current?.title ||
|
||||
this.lastPoll?.tidal_current?.status !== newPoll.tidal_current?.status
|
||||
) {
|
||||
updates.push({
|
||||
component: "tidal_current",
|
||||
data: newPoll.tidal_current,
|
||||
});
|
||||
}
|
||||
|
||||
this.sseService.notifyClients({
|
||||
type: "update",
|
||||
data: updates,
|
||||
});
|
||||
|
||||
this.lastPoll = newPoll;
|
||||
this.scheduleSongEndPoll(newPoll.tidal_current);
|
||||
return newPoll;
|
||||
}
|
||||
|
||||
private scheduleSongEndPoll(tidal: TidalGetCurrent | null): void {
|
||||
this.clearSongEndPoll();
|
||||
|
||||
if (!tidal || tidal.status === "paused") {
|
||||
return;
|
||||
}
|
||||
|
||||
const remainingSeconds = tidal.durationInSeconds - tidal.currentInSeconds;
|
||||
if (remainingSeconds > 0) {
|
||||
this.songEndTimeout = setTimeout(
|
||||
() => {
|
||||
this.songEndTimeout = null;
|
||||
this.getFullInformation();
|
||||
},
|
||||
(remainingSeconds + 1) * 1000,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private clearSongEndPoll(): void {
|
||||
if (this.songEndTimeout) {
|
||||
clearTimeout(this.songEndTimeout);
|
||||
this.songEndTimeout = null;
|
||||
}
|
||||
}
|
||||
|
||||
listenForClientChange(): void {
|
||||
this.sseService.onClientChange((clientChange: SseClientChangeEvent) => {
|
||||
if (clientChange.clientCount === 0) {
|
||||
this.stopPolling();
|
||||
} else {
|
||||
if (!this.pollingInterval) {
|
||||
this.startPolling();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
startPolling(): void {
|
||||
this.pollingInterval = setInterval(() => {
|
||||
this.getFullInformation();
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
stopPolling(): void {
|
||||
if (this.pollingInterval) {
|
||||
clearInterval(this.pollingInterval);
|
||||
}
|
||||
this.clearSongEndPoll();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,8 +103,8 @@ const port = parseInt(Config.port, 10);
|
||||
// Register routes
|
||||
await fastify.register(homeAssistantRoutes, { haService, verifyAPIKey });
|
||||
await fastify.register(tidalRoutes, { tidalService, verifyAPIKey });
|
||||
await fastify.register(sseRoutes, { sseService, verifyAPIKey });
|
||||
await fastify.register(homepageRoutes, { hpService, verifyAPIKey });
|
||||
await fastify.register(sseRoutes, { sseService });
|
||||
await fastify.register(homepageRoutes, { hpService });
|
||||
|
||||
fastify.get(
|
||||
"/ping",
|
||||
|
||||
@@ -1,49 +1,44 @@
|
||||
import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
|
||||
import { logInfo, SseEvent, SseService } from "@dpu/shared";
|
||||
import { randomUUID } from "crypto";
|
||||
import type { FastifyInstance } from "fastify";
|
||||
import { logInfo, type SseEvent, type SseService } from "@dpu/shared";
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
export async function sseRoutes(
|
||||
fastify: FastifyInstance,
|
||||
{
|
||||
sseService,
|
||||
verifyAPIKey,
|
||||
}: {
|
||||
sseService: SseService;
|
||||
verifyAPIKey: (
|
||||
request: FastifyRequest,
|
||||
reply: FastifyReply,
|
||||
) => Promise<void>;
|
||||
},
|
||||
fastify: FastifyInstance,
|
||||
{
|
||||
sseService,
|
||||
}: {
|
||||
sseService: SseService;
|
||||
},
|
||||
) {
|
||||
fastify.get(
|
||||
"/dpu/events",
|
||||
{
|
||||
schema: {
|
||||
description: "Register for SSE",
|
||||
tags: ["sse"],
|
||||
hide: true
|
||||
},
|
||||
sse: true
|
||||
},
|
||||
async (request, reply) => {
|
||||
reply.sse.keepAlive()
|
||||
fastify.get(
|
||||
"/dpu/events",
|
||||
{
|
||||
schema: {
|
||||
description: "Register for SSE",
|
||||
tags: ["sse"],
|
||||
hide: true,
|
||||
},
|
||||
sse: true,
|
||||
},
|
||||
async (_request, reply) => {
|
||||
reply.sse.keepAlive();
|
||||
|
||||
const clientId = randomUUID();
|
||||
const sendEvent = (data: SseEvent) => {
|
||||
reply.sse.send({
|
||||
data: JSON.stringify(data)
|
||||
});
|
||||
};
|
||||
const clientId = randomUUID();
|
||||
const sendEvent = (data: SseEvent) => {
|
||||
reply.sse.send({
|
||||
data: JSON.stringify(data),
|
||||
});
|
||||
};
|
||||
|
||||
sseService.addClient({ id: clientId, send: sendEvent });
|
||||
sseService.addClient({ id: clientId, send: sendEvent });
|
||||
|
||||
await reply.sse.send({ data: 'Connected' });
|
||||
logInfo(`Connection for client ${clientId} established`);
|
||||
await reply.sse.send({ data: "Connected" });
|
||||
logInfo(`Connection for client ${clientId} established`);
|
||||
|
||||
reply.sse.onClose(() => {
|
||||
sseService.removeClient(clientId);
|
||||
logInfo(`Connection for client ${clientId} closed`);
|
||||
})
|
||||
},
|
||||
);
|
||||
reply.sse.onClose(() => {
|
||||
sseService.removeClient(clientId);
|
||||
logInfo(`Connection for client ${clientId} closed`);
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user