From: Kyle Fuller Date: Fri, 23 Sep 2022 21:53:11 +0000 (+0100) Subject: feat: retry failed 5xx requests X-Git-Url: http://git.99rst.org/?a=commitdiff_plain;h=ea55583be032f777e599ac91dab989d110e07217;p=znc-palaver.git feat: retry failed 5xx requests --- diff --git a/CHANGELOG.md b/CHANGELOG.md index e61288f..e165775 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog for Palaver ZNC Module +## TBD + +- Retry sending push notification on failure. + ## 1.2.1 (18/07/2020) ### Bug Fixes diff --git a/palaver.cpp b/palaver.cpp index 7bf6972..520aaa5 100644 --- a/palaver.cpp +++ b/palaver.cpp @@ -9,6 +9,9 @@ #define ZNC_PALAVER_VERSION "1.2.1" +#include +#include + #include #include #include @@ -112,6 +115,24 @@ struct PLVHTTPRequest : PLVHTTPMessage { }; }; +class RetryStrategy { +public: + bool ShouldRetryRequest(unsigned int status) { + bool is5xx = status >= 500 && status <= 600; + return is5xx; + } + + unsigned int GetMaximumRetryAttempts() { + return 5; + } + + unsigned int GetDelay(unsigned int uAttempts) { + double minimumBackoff = 1; + double maximumBackoff = 10; + return std::max(std::min(pow((double)uAttempts, 2), maximumBackoff), minimumBackoff); + } +}; + typedef enum { StatusLine = 0, Headers = 1, @@ -123,6 +144,9 @@ class PLVHTTPSocket : public CSocket { EPLVHTTPSocketState m_eState; public: + std::shared_ptr m_request; + unsigned int m_attempts; + PLVHTTPSocket(CModule *pModule, PLVURL url) : CSocket(pModule) { m_eState = StatusLine; m_sHostname = url.host; @@ -135,13 +159,17 @@ public: EnableReadLine(); } - void Send(PLVHTTPRequest &request) { - Write(request.method + " " + request.url.path + " HTTP/1.1\r\n"); - Write("Host: " + request.url.host + "\r\n"); + void Send(std::shared_ptr request, unsigned int attempts) { + m_eState = StatusLine; + m_request = request; + m_attempts = attempts; + + Write(request.get()->method + " " + request.get()->url.path + " HTTP/1.1\r\n"); + Write("Host: " + request.get()->url.host + "\r\n"); Write("Connection: close\r\n"); - Write("Content-Length: " + CString(request.body.length()) + "\r\n"); + Write("Content-Length: " + CString(request.get()->body.length()) + "\r\n"); - for (MCString::const_iterator it = request.headers.begin(); it != request.headers.end(); ++it) { + for (MCString::const_iterator it = request.get()->headers.begin(); it != request.get()->headers.end(); ++it) { const CString &sKey = it->first; const CString &sValue = it->second; @@ -150,8 +178,8 @@ public: Write("\r\n"); - if (request.body.length() > 0) { - Write(request.body); + if (request.get()->body.length() > 0) { + Write(request.get()->body); } } @@ -790,9 +818,9 @@ public: mcsHeaders["Content-Type"] = "application/json"; mcsHeaders["User-Agent"] = "znc-palaver/" + CString(ZNC_PALAVER_VERSION) + " znc/" + CZNC::GetVersion(); - PLVHTTPRequest request = PLVHTTPRequest(GetPushURL(), "POST", mcsHeaders, sJSONBody); + std::shared_ptr request = std::make_shared(GetPushURL(), "POST", mcsHeaders, sJSONBody); PLVHTTPSocket *pSocket = new PLVHTTPNotificationSocket(&module, token, GetPushURL()); - pSocket->Send(request); + pSocket->Send(request, 0); module.AddSocket(pSocket); } @@ -1060,6 +1088,12 @@ public: return false; } + void SendRequest(const CString &sIdentifier, std::shared_ptr request, unsigned int attempts) { + PLVHTTPSocket *pSocket = new PLVHTTPNotificationSocket(this, sIdentifier, request.get()->url); + pSocket->Send(request, attempts); + AddSocket(pSocket); + } + #pragma mark - Serialization CString GetConfigPath() const { @@ -1321,12 +1355,54 @@ private: std::vector m_vDevices; }; +class PLVRetryTimer : public CTimer { + public: + PLVRetryTimer( + CModule* pModule, + unsigned int uDelay, + const CString& sLabel, + const CString& sDescription, + const CString& sIdentifier, + std::shared_ptr request, + unsigned int uAttempts + ) : CTimer(pModule, uDelay, 1, sLabel, sDescription) + { + m_sIdentifier = sIdentifier; + m_request = request; + m_uAttempts = uAttempts; + } + ~PLVRetryTimer() override {} + + private: + CString m_sIdentifier; + std::shared_ptr m_request; + unsigned int m_uAttempts; + + protected: + void RunJob() override { + if (CPalaverMod *pModule = dynamic_cast(m_pModule)) { + pModule->SendRequest(m_sIdentifier, m_request, m_uAttempts); + } + } +}; + + void PLVHTTPNotificationSocket::HandleStatusCode(unsigned int status) { if (status == 401 || status == 404) { if (CPalaverMod *pModule = dynamic_cast(m_pModule)) { DEBUG("palaver: Removing device"); pModule->RemoveDeviceWithIdentifier(m_sIdentifier); } + return; + } + + RetryStrategy retryStrategy = RetryStrategy(); + if (retryStrategy.ShouldRetryRequest(status) && retryStrategy.GetMaximumRetryAttempts() > (m_attempts + 1)) { + DEBUG("palaver: Retrying failed request"); + m_pModule->AddTimer( + new PLVRetryTimer(m_pModule, retryStrategy.GetDelay(m_attempts + 1), "Request Retry", "Retry a failed pysh notification", m_sIdentifier, m_request, m_attempts + 1) + ); + return; } } diff --git a/test/test_palaver.py b/test/test_palaver.py index 71dc1ec..9c2710f 100644 --- a/test/test_palaver.py +++ b/test/test_palaver.py @@ -1,10 +1,10 @@ import asyncio import json import os -import time -from typing import Dict +from typing import Dict, Tuple, Optional import pytest +import pytest_asyncio from semantic_version import Version @@ -46,6 +46,21 @@ async def read_headers(reader) -> Dict[str, str]: headers[name] = value +async def read_push_request(reader) -> Tuple[Dict[str, str], bytes]: + request_line = await reader.readline() + assert request_line == b'POST /push HTTP/1.1\r\n' + + headers = await read_headers(reader) + assert headers['Host'] == '127.0.0.1' + assert headers['Connection'] == 'close' + assert headers['Content-Type'] == 'application/json' + await assert_user_agent(headers['User-Agent']) + + assert 'Content-Length' in headers + body = await reader.read(int(headers['Content-Length'])) + return headers, body + + async def assert_user_agent(user_agent): products = user_agent.split(' ') assert len(products) == 2 @@ -64,7 +79,7 @@ async def setUp(): allow_root = ' --allow-root' if running_as_root else '' proc = await asyncio.create_subprocess_shell(f'znc -d test/fixtures --foreground --debug{allow_root}') - time.sleep(31 if running_as_root else 1) + await asyncio.sleep(31 if running_as_root else 1) (reader, writer) = await asyncio.open_connection('localhost', 6698) writer.write(b'CAP LS 302\r\n') @@ -98,8 +113,15 @@ async def tearDown(proc): os.remove(config) -async def test_registering_device(): - (proc, reader, writer) = await setUp() +@pytest_asyncio.fixture +async def znc(): + proc, reader, writer = await setUp() + yield (reader, writer) + await tearDown(proc) + + +async def test_registering_device(znc): + reader, writer = znc writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n') await writer.drain() @@ -115,8 +137,6 @@ async def test_registering_device(): writer.write(b'PALAVER END\r\n') await writer.drain() - await tearDown(proc) - async def test_loading_module_new_cap(): await requires_znc_version('1.7.0') @@ -161,23 +181,13 @@ async def test_unloading_module_del_cap(): await tearDown(proc) -async def test_receiving_notification(): - (proc, reader, writer) = await setUp() +async def test_receiving_notification(znc): + reader, writer = znc async def connected(reader, writer): - line = await reader.readline() - assert line == b'POST /push HTTP/1.1\r\n' - - headers = await read_headers(reader) - assert headers['Host'] == '127.0.0.1' + headers, body = await read_push_request(reader) assert headers['Authorization'] == 'Bearer 9167e47b01598af7423e2ecd3d0a3ec4' - assert headers['Connection'] == 'close' - assert headers['Content-Length'] == '109' - assert headers['Content-Type'] == 'application/json' - await assert_user_agent(headers['User-Agent']) - - line = await reader.readline() - assert json.loads(line.decode('utf-8')) == { + assert json.loads(body.decode('utf-8')) == { 'badge': 1, 'message': 'Test notification', 'sender': 'palaver', @@ -193,7 +203,7 @@ async def test_receiving_notification(): server = await asyncio.start_server(connected, host='127.0.0.1', port=0) await asyncio.sleep(0.2) addr = server.sockets[0].getsockname() - url = f'Serving on http://{addr[0]}:{addr[1]}/push' + url = f'http://{addr[0]}:{addr[1]}/push' writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n') await writer.drain() @@ -213,28 +223,14 @@ async def test_receiving_notification(): server.close() await server.wait_closed() - await tearDown(proc) - assert connected.called -async def test_receiving_notification_with_push_token(): - (proc, reader, writer) = await setUp() - +async def test_receiving_notification_with_push_token(znc): async def connected(reader, writer): - line = await reader.readline() - assert line == b'POST /push HTTP/1.1\r\n' - - headers = await read_headers(reader) - assert headers['Host'] == '127.0.0.1' + headers, body = await read_push_request(reader) assert headers['Authorization'] == 'Bearer abcdefg' - assert headers['Connection'] == 'close' - assert headers['Content-Length'] == '109' - assert headers['Content-Type'] == 'application/json' - await assert_user_agent(headers['User-Agent']) - - line = await reader.readline() - assert json.loads(line.decode('utf-8')) == { + assert json.loads(body.decode('utf-8')) == { 'badge': 1, 'message': 'Test notification', 'sender': 'palaver', @@ -250,8 +246,9 @@ async def test_receiving_notification_with_push_token(): server = await asyncio.start_server(connected, host='127.0.0.1', port=0) await asyncio.sleep(0.2) addr = server.sockets[0].getsockname() - url = f'Serving on http://{addr[0]}:{addr[1]}/push' + url = f'http://{addr[0]}:{addr[1]}/push' + reader, writer = znc writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n') await writer.drain() @@ -271,6 +268,60 @@ async def test_receiving_notification_with_push_token(): server.close() await server.wait_closed() - await tearDown(proc) - assert connected.called + + +async def test_receiving_notification_with_retry_on_server_error(znc): + reader, writer = znc + + async def connected(reader, writer): + headers, body = await read_push_request(reader) + assert headers['Authorization'] == 'Bearer abcdefg' + assert json.loads(body.decode('utf-8')) == { + 'badge': 1, + 'message': 'Test notification', + 'sender': 'palaver', + 'network': 'b758eaab1a4611a310642a6e8419fbff' + } + + if not hasattr(connected, 'requests'): + connected.requests = 1 + writer.write(b'HTTP/1.1 503 Service Unavailable\r\n') + else: + connected.requests += 1 + writer.write(b'HTTP/1.1 204 No Content\r\n') + + writer.write(b'Connection: close\r\n') + writer.write(b'\r\n') + + await writer.drain() + writer.close() + + server = await asyncio.start_server(connected, host='127.0.0.1', port=8121) + await asyncio.sleep(0.2) + addr = server.sockets[0].getsockname() + url = f'http://{addr[0]}:{addr[1]}/push' + + writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n') + await writer.drain() + + line = await reader.readline() + assert line == b'PALAVER REQ *\r\n' + + writer.write(b'PALAVER BEGIN 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e\r\n') + writer.write(f'PALAVER SET PUSH-ENDPOINT {url}\r\n'.encode('utf-8')) + writer.write(f'PALAVER SET PUSH-TOKEN abcdefg\r\n'.encode('utf-8')) + writer.write(b'PALAVER END\r\n') + await writer.drain() + + writer.write(b'PRIVMSG *palaver :test\r\n') + await writer.drain() + + line = await reader.readline() + assert line == b':*palaver!znc@znc.in PRIVMSG admin :Notification sent to 1 clients.\r\n' + + await asyncio.sleep(1.2) + server.close() + await server.wait_closed() + + assert connected.requests == 2