The current (12 April 2025) scheduling of hive api nodes for hive-engine servers has some undesirable behaviours. Chiefly, it forms an opinion of the speed of nodes and then does not change this in response to changing conditions. When a hive-api node fails or becomes congested, it usually takes a hive-engine restart to get the scheduler to re-form its opinions on hive API node response times. It also takes too long to respond to a hive API node failing, so it may fall behind.
Not reacting to current conditions is due to using a long-term average (mean) measure of API node response times. I have proposed a scheduler that uses a modified Exponential Moving Average (EMA) so that scheduling adjusts to changing conditions. There is a minor decay, so all nodes are tried periodically to spread the load and test for changes in response speed. Fast nodes are basically weighted equally, so hive API requests should be spread amongst them. The rationale is that any nodes that respond faster than a certain speed are equally fast enough.
Also, a minor tweak to the connection config to the dhive client library so that it fails faster, without retries - essentially leaving retrying up to the hive-engine system. This should improve the speed of reacting to situations where a hive API node has just failed, and allow the hive-engine node to resume processing sooner. With this in mind, please don't use dhive's fallback node feature, let Hive-Engine handle that!
NOTE: This code is experimental. Some constants encoded within probably need tweaking, or put into config.json. We should also add in some way to bias towards one's own Hive API node with a fallback should it become slow or unresponsive.
ALPHA CODE WARNING: I invite the curious to try this code, but warn that most node operators should wait until enough testing has been done to judge the effects. If you don't understand the code, don't run it.
- Review the code
- Back up plugins/Streamer.js
- Replace plugins/Streamer.js with the code linked below
- Restart your hive-engine node
The code is here:
You can revert to the older scheduler by restoring your backup of plugins/Streamer.js and restarting your hive-engine node.
Here is a git diff plugins/Streamer.js
, so you can understand what I changed.
diff --git a/plugins/Streamer.js b/plugins/Streamer.js
index 651e37a..a42e69f 100644
--- a/plugins/Streamer.js
+++ b/plugins/Streamer.js
@@ -44,6 +44,7 @@ const inFlightRequests = {};
const pendingRequests = [];
const totalRequests = {};
const totalTime = {};
+const movingAverageTime = {};
let lookaheadStartIndex = 0;
let lookaheadStartBlock = currentHiveBlock;
let blockLookaheadBuffer = null; // initialized by init()
@@ -348,7 +349,7 @@ const updateGlobalProps = async () => {
const nodes = Object.keys(totalRequests);
nodes.forEach((node) => {
// eslint-disable-next-line no-console
- console.log(`Node block fetch average for ${node} is ${totalTime[node] / totalRequests[node]} with ${totalRequests[node]} requests`);
+ console.log(`Node block fetch average for ${node} is ${(totalTime[node] / totalRequests[node]).toFixed(2)} with ${totalRequests[node]} requests | ${movingAverageTime[node].toFixed(2)}`);
});
}
} catch (ex) {
@@ -375,14 +376,18 @@ const addBlockToBuffer = async (block) => {
const doClientGetBlock = async (client, blockNumber) => {
let res = null;
- if (useBlockApi) {
- res = await client.call('block_api', 'get_block', { "block_num": blockNumber});
- res = res.block;
- if (res) {
- res.blockNumber = blockNumber;
+ try {
+ if (useBlockApi) {
+ res = await client.call('block_api', 'get_block', { "block_num": blockNumber});
+ res = res.block;
+ if (res) {
+ res.blockNumber = blockNumber;
+ }
+ } else {
+ res = await client.database.getBlock(blockNumber);
}
- } else {
- res = await client.database.getBlock(blockNumber);
+ } catch(err) {
+ console.log(JSON.stringify(err));
}
return res;
}
@@ -396,7 +401,9 @@ const throttledGetBlockFromNode = async (blockNumber, node) => {
try {
res = await doClientGetBlock(clients[node], blockNumber);
totalRequests[node] += 1;
- totalTime[node] += Date.now() - timeStart;
+ const requestDuration = Date.now() - timeStart;
+ totalTime[node] += requestDuration;
+ movingAverageTime[node] = movingAverageTime[node] < 0 ? requestDuration : (movingAverageTime[node] * 0.4) + (requestDuration / 0.6);
} catch (err) {
log.error(`Error fetching block ${blockNumber} on node ${node}, took ${Date.now() - timeStart} ms`);
log.error(err);
@@ -419,14 +426,30 @@ const throttledGetBlock = async (blockNumber) => {
inFlightRequests[n] = 0;
totalRequests[n] = 0;
totalTime[n] = 0;
+ movingAverageTime[n] = -1;
capacity += maxQps;
}
+ if(movingAverageTime[n] > 800) movingAverageTime[n] *= 0.995;
+ else if(movingAverageTime[n] > 225) movingAverageTime[n]--; // Decay so nodes eventually retry
+ else if(movingAverageTime[n] < -2) { // API endpoint unreachable on node start
+ movingAverageTime[n] = 10000;
+ totalTime[n] = 1000;
+ totalRequests[n] = 0;
+ }
});
+
if (totalInFlightRequests < capacity) {
- // select node in order
- for (let i = 0; i < nodes.length; i += 1) {
- const node = nodes[i];
+ // select node in speed order
+ const sortedNodes = nodes.sort((a, b) =>
+ movingAverageTime[a] <= 225 && movingAverageTime[b] <= 225
+ ? totalRequests[a] - totalRequests[b] // Fast nodes are sorted by lowest requests
+ : movingAverageTime[a] - movingAverageTime[b]
+ );
+
+ for (let i = 0; i < sortedNodes.length; i += 1) {
+ const node = sortedNodes[i];
if (inFlightRequests[node] < maxQps) {
+ if(movingAverageTime[node] < 0) movingAverageTime[node]--;
return throttledGetBlockFromNode(blockNumber, node);
}
}
@@ -435,7 +458,6 @@ const throttledGetBlock = async (blockNumber) => {
return throttledGetBlock(blockNumber);
};
-
const getBlock = async (blockNumber) => {
// schedule lookahead block fetch
const nodes = Object.keys(totalRequests);
nodes.forEach((node) => {
// eslint-disable-next-line no-console
- console.log(`Node block fetch average for ${node} is ${totalTime[node] / totalRequests[node]} with ${totalRequests[node]} requests`);
+ console.log(`Node block fetch average for ${node} is ${(totalTime[node] / totalRequests[node]).toFixed(2)} with ${totalRequests[node]} requests | ${movingAverageTime[node].toFixed(2)}`);
});
}
} catch (ex) {
@@ -375,14 +376,18 @@ const addBlockToBuffer = async (block) => {
const doClientGetBlock = async (client, blockNumber) => {
let res = null;
- if (useBlockApi) {
- res = await client.call('block_api', 'get_block', { "block_num": blockNumber});
- res = res.block;
- if (res) {
- res.blockNumber = blockNumber;
+ try {
+ if (useBlockApi) {
+ res = await client.call('block_api', 'get_block', { "block_num": blockNumber});
+ res = res.block;
+ if (res) {
+ res.blockNumber = blockNumber;
+ }
+ } else {
+ res = await client.database.getBlock(blockNumber);
}
- } else {
- res = await client.database.getBlock(blockNumber);
+ } catch(err) {
+ console.log(JSON.stringify(err));
}
return res;
}
@@ -396,7 +401,9 @@ const throttledGetBlockFromNode = async (blockNumber, node) => {
try {
res = await doClientGetBlock(clients[node], blockNumber);
totalRequests[node] += 1;
- totalTime[node] += Date.now() - timeStart;
+ const requestDuration = Date.now() - timeStart;
+ totalTime[node] += requestDuration;
+ movingAverageTime[node] = movingAverageTime[node] < 0 ? requestDuration : (movingAverageTime[node] * 0.4) + (requestDuration / 0.6);
} catch (err) {
log.error(`Error fetching block ${blockNumber} on node ${node}, took ${Date.now() - timeStart} ms`);
log.error(err);
@@ -419,14 +426,30 @@ const throttledGetBlock = async (blockNumber) => {
inFlightRequests[n] = 0;
totalRequests[n] = 0;
totalTime[n] = 0;
+ movingAverageTime[n] = -1;
capacity += maxQps;
}
+ if(movingAverageTime[n] > 800) movingAverageTime[n] *= 0.995;
+ else if(movingAverageTime[n] > 225) movingAverageTime[n]--; // Decay so nodes eventually retry
+ else if(movingAverageTime[n] < -2) { // API endpoint unreachable on node start
+ movingAverageTime[n] = 10000;
+ totalTime[n] = 1000;
+ totalRequests[n] = 0;
+ }
});
+
if (totalInFlightRequests < capacity) {
- // select node in order
- for (let i = 0; i < nodes.length; i += 1) {
- const node = nodes[i];
+ // select node in speed order
+ const sortedNodes = nodes.sort((a, b) =>
+ movingAverageTime[a] <= 225 && movingAverageTime[b] <= 225
+ ? totalRequests[a] - totalRequests[b] // Fast nodes are sorted by lowest requests
+ : movingAverageTime[a] - movingAverageTime[b]
+ );
+
+ for (let i = 0; i < sortedNodes.length; i += 1) {
+ const node = sortedNodes[i];
if (inFlightRequests[node] < maxQps) {
+ if(movingAverageTime[node] < 0) movingAverageTime[node]--;
return throttledGetBlockFromNode(blockNumber, node);
}
}
@@ -435,7 +458,6 @@ const throttledGetBlock = async (blockNumber) => {
return throttledGetBlock(blockNumber);
};
-
const getBlock = async (blockNumber) => {
// schedule lookahead block fetch
let scanIndex = lookaheadStartIndex;
@@ -513,7 +535,10 @@ const initHiveClient = (streamNodes, node) => {
if (!clients) {
clients = {};
streamNodes.forEach((n) => {
- clients[n] = new dhive.Client(n);
+ clients[n] = new dhive.Client(n, {
+ failoverThreshold: 1,
+ timeout: 17 * 1000
+ });
});
}
client = clients[node];
Those who do try this code, please let me know how it goes in Discord.
This is amazing! Will take us a few days to check it out, see what lessons can be gleaned!
please transfer xxx units and/or xxx balance to otheraccount
Can I make a recommendation? You can transfer units or even JUST BALANCE from hsbi to this account, just send 0.003 hive to @steembasicincome with memo;can even be encrypted.
Congratulations @eturnerx-honey! You have completed the following achievement on the Hive blockchain And have been rewarded with New badge(s)
Your next target is to reach 400 upvotes.
You can view your badges on your board and compare yourself to others in the Ranking
If you no longer want to receive notifications, reply to this comment with the word
STOP