feat: retry failed 5xx requests
authorKyle Fuller <redacted>
Fri, 23 Sep 2022 21:53:11 +0000 (22:53 +0100)
committerKyle Fuller <redacted>
Fri, 23 Sep 2022 21:53:47 +0000 (22:53 +0100)
CHANGELOG.md
palaver.cpp
test/test_palaver.py

index e61288ff70c30df094b765542a6d7a373d5dc45c..e1657752420daad144736a42b3b72ca9d177a8ac 100644 (file)
@@ -1,5 +1,9 @@
 # Changelog for Palaver ZNC Module
 
+## TBD
+
+- Retry sending push notification on failure.
+
 ## 1.2.1 (18/07/2020)
 
 ### Bug Fixes
index 7bf6972a38b0dca10a802e02cdc3a3dd74c8cdf7..520aaa5dec6eb2c02f0907c53860add1cf97437e 100644 (file)
@@ -9,6 +9,9 @@
 
 #define ZNC_PALAVER_VERSION "1.2.1"
 
+#include <algorithm>
+#include <cmath>
+
 #include <znc/Modules.h>
 #include <znc/User.h>
 #include <znc/IRCNetwork.h>
@@ -112,6 +115,24 @@ struct PLVHTTPRequest : PLVHTTPMessage {
        };
 };
 
+class RetryStrategy {
+public:
+       bool ShouldRetryRequest(unsigned int status) {
+               bool is5xx = status >= 500 && status <= 600;
+               return is5xx;
+       }
+
+       unsigned int GetMaximumRetryAttempts() {
+               return 5;
+       }
+
+       unsigned int GetDelay(unsigned int uAttempts) {
+               double minimumBackoff = 1;
+               double maximumBackoff = 10;
+               return std::max(std::min(pow((double)uAttempts, 2), maximumBackoff), minimumBackoff);
+       }
+};
+
 typedef enum {
        StatusLine = 0,
        Headers = 1,
@@ -123,6 +144,9 @@ class PLVHTTPSocket : public CSocket {
        EPLVHTTPSocketState m_eState;
 
 public:
+       std::shared_ptr<PLVHTTPRequest> m_request;
+       unsigned int m_attempts;
+
        PLVHTTPSocket(CModule *pModule, PLVURL url) : CSocket(pModule) {
                m_eState = StatusLine;
                m_sHostname = url.host;
@@ -135,13 +159,17 @@ public:
                EnableReadLine();
        }
 
-       void Send(PLVHTTPRequest &request) {
-               Write(request.method + " " + request.url.path + " HTTP/1.1\r\n");
-               Write("Host: " + request.url.host + "\r\n");
+       void Send(std::shared_ptr<PLVHTTPRequest> request, unsigned int attempts) {
+               m_eState = StatusLine;
+               m_request = request;
+               m_attempts = attempts;
+
+               Write(request.get()->method + " " + request.get()->url.path + " HTTP/1.1\r\n");
+               Write("Host: " + request.get()->url.host + "\r\n");
                Write("Connection: close\r\n");
-               Write("Content-Length: " + CString(request.body.length()) + "\r\n");
+               Write("Content-Length: " + CString(request.get()->body.length()) + "\r\n");
 
-               for (MCString::const_iterator it = request.headers.begin(); it != request.headers.end(); ++it) {
+               for (MCString::const_iterator it = request.get()->headers.begin(); it != request.get()->headers.end(); ++it) {
                        const CString &sKey = it->first;
                        const CString &sValue = it->second;
 
@@ -150,8 +178,8 @@ public:
 
                Write("\r\n");
 
-               if (request.body.length() > 0) {
-                       Write(request.body);
+               if (request.get()->body.length() > 0) {
+                       Write(request.get()->body);
                }
        }
 
@@ -790,9 +818,9 @@ public:
                mcsHeaders["Content-Type"] = "application/json";
                mcsHeaders["User-Agent"] = "znc-palaver/" + CString(ZNC_PALAVER_VERSION) + " znc/" + CZNC::GetVersion();
 
-               PLVHTTPRequest request = PLVHTTPRequest(GetPushURL(), "POST", mcsHeaders, sJSONBody);
+               std::shared_ptr<PLVHTTPRequest> request = std::make_shared<PLVHTTPRequest>(GetPushURL(), "POST", mcsHeaders, sJSONBody);
                PLVHTTPSocket *pSocket = new PLVHTTPNotificationSocket(&module, token, GetPushURL());
-               pSocket->Send(request);
+               pSocket->Send(request, 0);
                module.AddSocket(pSocket);
        }
 
@@ -1060,6 +1088,12 @@ public:
                return false;
        }
 
+       void SendRequest(const CString &sIdentifier, std::shared_ptr<PLVHTTPRequest> request, unsigned int attempts) {
+               PLVHTTPSocket *pSocket = new PLVHTTPNotificationSocket(this, sIdentifier, request.get()->url);
+               pSocket->Send(request, attempts);
+               AddSocket(pSocket);
+       }
+
 #pragma mark - Serialization
 
        CString GetConfigPath() const {
@@ -1321,12 +1355,54 @@ private:
        std::vector<CDevice*> m_vDevices;
 };
 
+class PLVRetryTimer : public CTimer {
+  public:
+       PLVRetryTimer(
+               CModule* pModule,
+               unsigned int uDelay,
+               const CString& sLabel,
+               const CString& sDescription,
+               const CString& sIdentifier,
+               std::shared_ptr<PLVHTTPRequest> request,
+               unsigned int uAttempts
+       ) : CTimer(pModule, uDelay, 1, sLabel, sDescription)
+       {
+               m_sIdentifier = sIdentifier;
+               m_request = request;
+               m_uAttempts = uAttempts;
+       }
+       ~PLVRetryTimer() override {}
+
+  private:
+       CString m_sIdentifier;
+       std::shared_ptr<PLVHTTPRequest> m_request;
+       unsigned int m_uAttempts;
+
+  protected:
+       void RunJob() override {
+               if (CPalaverMod *pModule = dynamic_cast<CPalaverMod *>(m_pModule)) {
+                       pModule->SendRequest(m_sIdentifier, m_request, m_uAttempts);
+               }
+       }
+};
+
+
 void PLVHTTPNotificationSocket::HandleStatusCode(unsigned int status) {
        if (status == 401 || status == 404) {
                if (CPalaverMod *pModule = dynamic_cast<CPalaverMod *>(m_pModule)) {
                        DEBUG("palaver: Removing device");
                        pModule->RemoveDeviceWithIdentifier(m_sIdentifier);
                }
+               return;
+       }
+
+       RetryStrategy retryStrategy = RetryStrategy();
+       if (retryStrategy.ShouldRetryRequest(status) && retryStrategy.GetMaximumRetryAttempts() > (m_attempts + 1)) {
+               DEBUG("palaver: Retrying failed request");
+               m_pModule->AddTimer(
+                       new PLVRetryTimer(m_pModule, retryStrategy.GetDelay(m_attempts + 1), "Request Retry", "Retry a failed pysh notification", m_sIdentifier, m_request, m_attempts + 1)
+               );
+               return;
        }
 }
 
index 71dc1ec741e984b057f5946e51ef6c92ce58d2db..9c2710f4f50dfec931290d14ecc40f85a5e63379 100644 (file)
@@ -1,10 +1,10 @@
 import asyncio
 import json
 import os
-import time
-from typing import Dict
+from typing import Dict, Tuple, Optional
 
 import pytest
+import pytest_asyncio
 from semantic_version import Version
 
 
@@ -46,6 +46,21 @@ async def read_headers(reader) -> Dict[str, str]:
         headers[name] = value
 
 
+async def read_push_request(reader) -> Tuple[Dict[str, str], bytes]:
+    request_line = await reader.readline()
+    assert request_line == b'POST /push HTTP/1.1\r\n'
+
+    headers = await read_headers(reader)
+    assert headers['Host'] == '127.0.0.1'
+    assert headers['Connection'] == 'close'
+    assert headers['Content-Type'] == 'application/json'
+    await assert_user_agent(headers['User-Agent'])
+
+    assert 'Content-Length' in headers
+    body = await reader.read(int(headers['Content-Length']))
+    return headers, body
+
+
 async def assert_user_agent(user_agent):
     products = user_agent.split(' ')
     assert len(products) == 2
@@ -64,7 +79,7 @@ async def setUp():
     allow_root = ' --allow-root' if running_as_root else ''
 
     proc = await asyncio.create_subprocess_shell(f'znc -d test/fixtures --foreground --debug{allow_root}')
-    time.sleep(31 if running_as_root else 1)
+    await asyncio.sleep(31 if running_as_root else 1)
 
     (reader, writer) = await asyncio.open_connection('localhost', 6698)
     writer.write(b'CAP LS 302\r\n')
@@ -98,8 +113,15 @@ async def tearDown(proc):
         os.remove(config)
 
 
-async def test_registering_device():
-    (proc, reader, writer) = await setUp()
+@pytest_asyncio.fixture
+async def znc():
+    proc, reader, writer = await setUp()
+    yield (reader, writer)
+    await tearDown(proc)
+
+
+async def test_registering_device(znc):
+    reader, writer = znc
 
     writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
     await writer.drain()
@@ -115,8 +137,6 @@ async def test_registering_device():
     writer.write(b'PALAVER END\r\n')
     await writer.drain()
 
-    await tearDown(proc)
-
 
 async def test_loading_module_new_cap():
     await requires_znc_version('1.7.0')
@@ -161,23 +181,13 @@ async def test_unloading_module_del_cap():
     await tearDown(proc)
 
 
-async def test_receiving_notification():
-    (proc, reader, writer) = await setUp()
+async def test_receiving_notification(znc):
+    reader, writer = znc
 
     async def connected(reader, writer):
-        line = await reader.readline()
-        assert line == b'POST /push HTTP/1.1\r\n'
-
-        headers = await read_headers(reader)
-        assert headers['Host'] == '127.0.0.1'
+        headers, body = await read_push_request(reader)
         assert headers['Authorization'] == 'Bearer 9167e47b01598af7423e2ecd3d0a3ec4'
-        assert headers['Connection'] == 'close'
-        assert headers['Content-Length'] == '109'
-        assert headers['Content-Type'] == 'application/json'
-        await assert_user_agent(headers['User-Agent'])
-
-        line = await reader.readline()
-        assert json.loads(line.decode('utf-8')) == {
+        assert json.loads(body.decode('utf-8')) == {
             'badge': 1,
             'message': 'Test notification',
             'sender': 'palaver',
@@ -193,7 +203,7 @@ async def test_receiving_notification():
     server = await asyncio.start_server(connected, host='127.0.0.1', port=0)
     await asyncio.sleep(0.2)
     addr = server.sockets[0].getsockname()
-    url = f'Serving on http://{addr[0]}:{addr[1]}/push'
+    url = f'http://{addr[0]}:{addr[1]}/push'
 
     writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
     await writer.drain()
@@ -213,28 +223,14 @@ async def test_receiving_notification():
     server.close()
     await server.wait_closed()
 
-    await tearDown(proc)
-
     assert connected.called
 
 
-async def test_receiving_notification_with_push_token():
-    (proc, reader, writer) = await setUp()
-
+async def test_receiving_notification_with_push_token(znc):
     async def connected(reader, writer):
-        line = await reader.readline()
-        assert line == b'POST /push HTTP/1.1\r\n'
-
-        headers = await read_headers(reader)
-        assert headers['Host'] == '127.0.0.1'
+        headers, body = await read_push_request(reader)
         assert headers['Authorization'] == 'Bearer abcdefg'
-        assert headers['Connection'] == 'close'
-        assert headers['Content-Length'] == '109'
-        assert headers['Content-Type'] == 'application/json'
-        await assert_user_agent(headers['User-Agent'])
-
-        line = await reader.readline()
-        assert json.loads(line.decode('utf-8')) == {
+        assert json.loads(body.decode('utf-8')) == {
             'badge': 1,
             'message': 'Test notification',
             'sender': 'palaver',
@@ -250,8 +246,9 @@ async def test_receiving_notification_with_push_token():
     server = await asyncio.start_server(connected, host='127.0.0.1', port=0)
     await asyncio.sleep(0.2)
     addr = server.sockets[0].getsockname()
-    url = f'Serving on http://{addr[0]}:{addr[1]}/push'
+    url = f'http://{addr[0]}:{addr[1]}/push'
 
+    reader, writer = znc
     writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
     await writer.drain()
 
@@ -271,6 +268,60 @@ async def test_receiving_notification_with_push_token():
     server.close()
     await server.wait_closed()
 
-    await tearDown(proc)
-
     assert connected.called
+
+
+async def test_receiving_notification_with_retry_on_server_error(znc):
+    reader, writer = znc
+
+    async def connected(reader, writer):
+        headers, body = await read_push_request(reader)
+        assert headers['Authorization'] == 'Bearer abcdefg'
+        assert json.loads(body.decode('utf-8')) == {
+            'badge': 1,
+            'message': 'Test notification',
+            'sender': 'palaver',
+            'network': 'b758eaab1a4611a310642a6e8419fbff'
+        }
+
+        if not hasattr(connected, 'requests'):
+            connected.requests = 1
+            writer.write(b'HTTP/1.1 503 Service Unavailable\r\n')
+        else:
+            connected.requests += 1
+            writer.write(b'HTTP/1.1 204 No Content\r\n')
+
+        writer.write(b'Connection: close\r\n')
+        writer.write(b'\r\n')
+
+        await writer.drain()
+        writer.close()
+
+    server = await asyncio.start_server(connected, host='127.0.0.1', port=8121)
+    await asyncio.sleep(0.2)
+    addr = server.sockets[0].getsockname()
+    url = f'http://{addr[0]}:{addr[1]}/push'
+
+    writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
+    await writer.drain()
+
+    line = await reader.readline()
+    assert line == b'PALAVER REQ *\r\n'
+
+    writer.write(b'PALAVER BEGIN 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e\r\n')
+    writer.write(f'PALAVER SET PUSH-ENDPOINT {url}\r\n'.encode('utf-8'))
+    writer.write(f'PALAVER SET PUSH-TOKEN abcdefg\r\n'.encode('utf-8'))
+    writer.write(b'PALAVER END\r\n')
+    await writer.drain()
+
+    writer.write(b'PRIVMSG *palaver :test\r\n')
+    await writer.drain()
+
+    line = await reader.readline()
+    assert line == b':*palaver!znc@znc.in PRIVMSG admin :Notification sent to 1 clients.\r\n'
+
+    await asyncio.sleep(1.2)
+    server.close()
+    await server.wait_closed()
+
+    assert connected.requests == 2
git clone https://git.99rst.org/PROJECT