Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
build:
context: ./frontend
dockerfile: Dockerfile.dev
volumes:
- ./frontend/src:/frontend/src

caddy:
image: caddy:2.6.1
Expand Down
1 change: 1 addition & 0 deletions frontend/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
2 changes: 2 additions & 0 deletions frontend/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ COPY . .

RUN pnpm i

VOLUME /frontend/src

CMD ["pnpm", "vite", "--port", "3000", "--host"]
4 changes: 3 additions & 1 deletion frontend/src/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Overview from './pages/Overview/Overview'
import Clusters from './pages/Clusters/Clusters'
import Backups from './pages/Backups/Backups'
import ScheduledBackups from './pages/Backups/ScheduledBackups'
import Replication from './pages/Replication/Replication'
import Errors from './pages/Errors/Errors'
import { Switch, Route, useHistory } from 'react-router-dom'

Expand Down Expand Up @@ -53,6 +54,7 @@ const items: MenuItem[] = [
},
{ key: 'query_performance', label: 'Query performance', icon: <ClockCircleOutlined /> },
{ key: 'running_queries', label: 'Running queries', icon: <DashboardOutlined /> },
{ key: 'replication', label: 'Replication', icon: <DashboardOutlined /> },
{ key: 'schema', label: 'Schema stats', icon: <HddOutlined /> },
{ key: 'disk_usage', label: 'Disk usage', icon: <ApartmentOutlined /> },
{ key: 'logs', label: 'Logs', icon: <BarsOutlined /> },
Expand Down Expand Up @@ -122,9 +124,9 @@ export default function AppLayout(): JSX.Element {
<Route exact path="/query_performance" component={SlowQueries}></Route>
<Route exact path="/schema" component={Schema}></Route>
<Route exact path="/schema/:table" component={SchemaTable}></Route>

<Route exact path="/query_performance/:query_hash" component={QueryDetail}></Route>
<Route exact path="/operations" component={Operations}></Route>
<Route exact path="/replication" component={Replication}></Route>
<Route exact path="/running_queries" component={RunningQueries}></Route>
<Route exact path="/logs" component={Logs}></Route>
<Route exact path="/errors" component={Errors}></Route>
Expand Down
196 changes: 196 additions & 0 deletions frontend/src/pages/Replication/Replication.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { Table, Button, notification, Typography, Tooltip, Spin, Select, Space } from 'antd'
import { usePollingEffect } from '../../utils/usePollingEffect'
import React, { useState, useEffect } from 'react'
import { ColumnType } from 'antd/es/table'

const { Paragraph } = Typography

interface ClusterNode {
cluster: string
shard_num: number
shard_weight: number
replica_num: number
host_name: string
host_address: string
port: number
is_local: number
user: string
default_database: string
errors_count: number
slowdowns_count: number
estimated_recovery_time: number
}

interface Cluster {
cluster: string
nodes: ClusterNode[]
}

interface ReplicationQueueItem {
host_name: string
database: string
table: string
position: number
error: string
last_attempt_time: string
num_attempts: number
type: string
postpone_reason: string
postpone: string
}

export default function Replication() {
const [replicationQueue, setReplicationQueue] = useState<ReplicationQueueItem[]>([])
const [loadingReplication, setLoadingReplication] = useState(false)
const [selectedCluster, setSelectedCluster] = useState<string>('')
const [clusters, setClusters] = useState<Cluster[]>([])
const [loadingClusters, setLoadingClusters] = useState(false)

useEffect(() => {
const fetchClusters = async () => {
setLoadingClusters(true)
try {
const res = await fetch('/api/clusters')
const resJson: Cluster[] = await res.json()
setClusters(resJson)
if (resJson.length > 0) {
setSelectedCluster(resJson[0].cluster)
}
} catch (err) {
notification.error({
message: 'Failed to fetch clusters',
description: 'Please try again later',
})
}
setLoadingClusters(false)
}
fetchClusters()
}, [])

const columns: ColumnType<ReplicationQueueItem>[] = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also retrieve the last_exception_time? It's useful to compare the time the exception happened with the last_attempt time. Also, having the source_replica gives a hint of which host could be the source of the issue. 🕵️

{
title: 'Host',
dataIndex: 'host_name',
key: 'host_name',
},
{
title: 'Database',
dataIndex: 'database',
key: 'database',
},
{
title: 'Table',
dataIndex: 'table',
key: 'table',
},
{
title: 'Type',
dataIndex: 'type',
key: 'type',
},
{
title: 'Last Postpone',
dataIndex: 'last_postpone_time',
key: 'postpone_time',
},
{
title: 'Postpone Reason',
dataIndex: 'postpone_reason',
key: 'postpone_reason',
render: (reason: string) => (
reason ? <Paragraph
style={{ maxWidth: '400px', color: 'orange' }}
ellipsis={{
rows: 2,
expandable: true,
}}
>
{reason}
</Paragraph> : null
),
},
{
title: 'Last Attempt',
dataIndex: 'last_attempt_time',
key: 'last_attempt_time',
},
{
title: 'Exception',
dataIndex: 'last_exception',
key: 'last_exception',
render: (last_exception: string) => (
last_exception ? <Paragraph
style={{ maxWidth: '400px', color: 'red' }}
ellipsis={{
rows: 2,
expandable: true,
}}
>
{last_exception}
</Paragraph> : null
),
},
{
title: 'Attempts',
dataIndex: 'num_attempts',
key: 'num_attempts',
},
]

usePollingEffect(
async () => {
if (!selectedCluster) return

setLoadingReplication(true)
try {
const res = await fetch(`/api/replication/?cluster=${selectedCluster}`)
const resJson: ReplicationQueueItem[] = await res.json()
// Filter for failed items only
const failedItems = resJson.filter((item) => item.error)
setReplicationQueue(failedItems)
} catch (err) {
notification.error({
message: 'Failed to fetch replication queue',
description: 'Please try again later',
})
}
setLoadingReplication(false)
},
[selectedCluster],
{ interval: 5000 }
)

return (
<>
<Space direction="vertical" size="large" style={{ width: '100%' }}>
<Space>
<h1 style={{ margin: 0 }}>
{`${replicationQueue.length}`} Failed Replication Queue Items
</h1>
{loadingReplication && <Spin />}
</Space>

<Select
style={{ width: 200 }}
value={selectedCluster}
onChange={setSelectedCluster}
loading={loadingClusters}
placeholder="Select a cluster"
>
{clusters.map((cluster) => (
<Select.Option key={cluster.cluster} value={cluster.cluster}>
{cluster.cluster}
</Select.Option>
))}
</Select>

<Table
columns={columns}
dataSource={replicationQueue}
loading={replicationQueue.length === 0 && loadingReplication}
rowKey={(record) => `${record.host_name}-${record.table}-${record.position}`}
/>
</Space>
</>
)
}
20 changes: 10 additions & 10 deletions frontend/src/utils/usePollingEffect.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ import { useEffect, useRef } from 'react'

export function usePollingEffect(
asyncCallback: any,
dependencies = [],
dependencies: any[] = [],
{
interval = 3000, // 3 seconds,
onCleanUp = () => {},
onCleanUp = () => { },
} = {}
) {
const timeoutIdRef = useRef<number | null>(null)
useEffect(() => {
let _stopped = false
;(async function pollingCallback() {
try {
await asyncCallback()
} finally {
// Set timeout after it finished, unless stopped
timeoutIdRef.current = !_stopped && window.setTimeout(pollingCallback, interval)
}
})()
; (async function pollingCallback() {
try {
await asyncCallback()
} finally {
// Set timeout after it finished, unless stopped
timeoutIdRef.current = !_stopped && window.setTimeout(pollingCallback, interval)
}
})()
// Clean up if dependencies change
return () => {
_stopped = true // prevent racing conditions
Expand Down
9 changes: 8 additions & 1 deletion housewatch/api/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import structlog
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
Expand All @@ -15,3 +14,11 @@ def list(self, request: Request) -> Response:

def retrieve(self, request: Request, pk: str) -> Response:
return Response(clusters.get_cluster(pk))


class ReplicationViewset(GenericViewSet):
def list(self, request: Request) -> Response:
cluster = request.query_params.get("cluster")
if not cluster:
return Response({"error": "cluster parameter is required"}, status=400)
return Response(list(clusters.get_replication_queues(cluster)))
60 changes: 36 additions & 24 deletions housewatch/clickhouse/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from typing import Dict, Optional
from clickhouse_pool import ChPool
from clickhouse_driver import Client
Expand All @@ -23,6 +22,24 @@
)


def get_client(node: Optional[Dict] = None):
if node:
client = Client(
host=node["host_name"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
else:
client = pool.get_client()
return client


def run_query_on_shards(
query: str,
params: Dict[str, str | int] = {},
Expand All @@ -38,24 +55,13 @@ def run_query_on_shards(
for shard, node in nodes:
params["shard"] = shard
final_query = query % (params or {}) if substitute_params else query
client = Client(
host=node["host_address"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
client = get_client(node)
result = client.execute(final_query, settings=query_settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]

response.append(item)
responses.append((shard, response))
return response
Expand All @@ -68,7 +74,7 @@ def run_query(
query_id: Optional[str] = None,
use_cache: bool = True, # defaulting to True for now for simplicity, but ideally we should default this to False
substitute_params: bool = True,
cluster: Optional[str] = None,
node: Optional[Dict] = None,
):
final_query = query % (params or {}) if substitute_params else query
query_hash = ""
Expand All @@ -79,18 +85,24 @@ def run_query(
if cached_result:
return json.loads(cached_result)

with pool.get_client() as client:
response = []
if node:
client = get_client(node)
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]
else:
with pool.get_client() as client:
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)

response.append(item)
if use_cache:
cache.set(query_hash, json.dumps(response, default=str), timeout=60 * 5)
return response
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]
response.append(item)

if use_cache:
cache.set(query_hash, json.dumps(response, default=str), timeout=60 * 5)

return response


existing_system_tables = [row["name"] for row in run_query(EXISTING_TABLES_SQL, use_cache=False)]
Loading
Loading