#define ZNC_PALAVER_VERSION "1.2.1"
+#include <algorithm>
+#include <cmath>
+
#include <znc/Modules.h>
#include <znc/User.h>
#include <znc/IRCNetwork.h>
};
};
+class RetryStrategy {
+public:
+ bool ShouldRetryRequest(unsigned int status) {
+ bool is5xx = status >= 500 && status <= 600;
+ return is5xx;
+ }
+
+ unsigned int GetMaximumRetryAttempts() {
+ return 5;
+ }
+
+ unsigned int GetDelay(unsigned int uAttempts) {
+ double minimumBackoff = 1;
+ double maximumBackoff = 10;
+ return std::max(std::min(pow((double)uAttempts, 2), maximumBackoff), minimumBackoff);
+ }
+};
+
typedef enum {
StatusLine = 0,
Headers = 1,
EPLVHTTPSocketState m_eState;
public:
+ std::shared_ptr<PLVHTTPRequest> m_request;
+ unsigned int m_attempts;
+
PLVHTTPSocket(CModule *pModule, PLVURL url) : CSocket(pModule) {
m_eState = StatusLine;
m_sHostname = url.host;
EnableReadLine();
}
- void Send(PLVHTTPRequest &request) {
- Write(request.method + " " + request.url.path + " HTTP/1.1\r\n");
- Write("Host: " + request.url.host + "\r\n");
+ void Send(std::shared_ptr<PLVHTTPRequest> request, unsigned int attempts) {
+ m_eState = StatusLine;
+ m_request = request;
+ m_attempts = attempts;
+
+ Write(request.get()->method + " " + request.get()->url.path + " HTTP/1.1\r\n");
+ Write("Host: " + request.get()->url.host + "\r\n");
Write("Connection: close\r\n");
- Write("Content-Length: " + CString(request.body.length()) + "\r\n");
+ Write("Content-Length: " + CString(request.get()->body.length()) + "\r\n");
- for (MCString::const_iterator it = request.headers.begin(); it != request.headers.end(); ++it) {
+ for (MCString::const_iterator it = request.get()->headers.begin(); it != request.get()->headers.end(); ++it) {
const CString &sKey = it->first;
const CString &sValue = it->second;
Write("\r\n");
- if (request.body.length() > 0) {
- Write(request.body);
+ if (request.get()->body.length() > 0) {
+ Write(request.get()->body);
}
}
mcsHeaders["Content-Type"] = "application/json";
mcsHeaders["User-Agent"] = "znc-palaver/" + CString(ZNC_PALAVER_VERSION) + " znc/" + CZNC::GetVersion();
- PLVHTTPRequest request = PLVHTTPRequest(GetPushURL(), "POST", mcsHeaders, sJSONBody);
+ std::shared_ptr<PLVHTTPRequest> request = std::make_shared<PLVHTTPRequest>(GetPushURL(), "POST", mcsHeaders, sJSONBody);
PLVHTTPSocket *pSocket = new PLVHTTPNotificationSocket(&module, token, GetPushURL());
- pSocket->Send(request);
+ pSocket->Send(request, 0);
module.AddSocket(pSocket);
}
return false;
}
+ void SendRequest(const CString &sIdentifier, std::shared_ptr<PLVHTTPRequest> request, unsigned int attempts) {
+ PLVHTTPSocket *pSocket = new PLVHTTPNotificationSocket(this, sIdentifier, request.get()->url);
+ pSocket->Send(request, attempts);
+ AddSocket(pSocket);
+ }
+
#pragma mark - Serialization
CString GetConfigPath() const {
std::vector<CDevice*> m_vDevices;
};
+class PLVRetryTimer : public CTimer {
+ public:
+ PLVRetryTimer(
+ CModule* pModule,
+ unsigned int uDelay,
+ const CString& sLabel,
+ const CString& sDescription,
+ const CString& sIdentifier,
+ std::shared_ptr<PLVHTTPRequest> request,
+ unsigned int uAttempts
+ ) : CTimer(pModule, uDelay, 1, sLabel, sDescription)
+ {
+ m_sIdentifier = sIdentifier;
+ m_request = request;
+ m_uAttempts = uAttempts;
+ }
+ ~PLVRetryTimer() override {}
+
+ private:
+ CString m_sIdentifier;
+ std::shared_ptr<PLVHTTPRequest> m_request;
+ unsigned int m_uAttempts;
+
+ protected:
+ void RunJob() override {
+ if (CPalaverMod *pModule = dynamic_cast<CPalaverMod *>(m_pModule)) {
+ pModule->SendRequest(m_sIdentifier, m_request, m_uAttempts);
+ }
+ }
+};
+
+
void PLVHTTPNotificationSocket::HandleStatusCode(unsigned int status) {
if (status == 401 || status == 404) {
if (CPalaverMod *pModule = dynamic_cast<CPalaverMod *>(m_pModule)) {
DEBUG("palaver: Removing device");
pModule->RemoveDeviceWithIdentifier(m_sIdentifier);
}
+ return;
+ }
+
+ RetryStrategy retryStrategy = RetryStrategy();
+ if (retryStrategy.ShouldRetryRequest(status) && retryStrategy.GetMaximumRetryAttempts() > (m_attempts + 1)) {
+ DEBUG("palaver: Retrying failed request");
+ m_pModule->AddTimer(
+ new PLVRetryTimer(m_pModule, retryStrategy.GetDelay(m_attempts + 1), "Request Retry", "Retry a failed pysh notification", m_sIdentifier, m_request, m_attempts + 1)
+ );
+ return;
}
}
import asyncio
import json
import os
-import time
-from typing import Dict
+from typing import Dict, Tuple, Optional
import pytest
+import pytest_asyncio
from semantic_version import Version
headers[name] = value
+async def read_push_request(reader) -> Tuple[Dict[str, str], bytes]:
+ request_line = await reader.readline()
+ assert request_line == b'POST /push HTTP/1.1\r\n'
+
+ headers = await read_headers(reader)
+ assert headers['Host'] == '127.0.0.1'
+ assert headers['Connection'] == 'close'
+ assert headers['Content-Type'] == 'application/json'
+ await assert_user_agent(headers['User-Agent'])
+
+ assert 'Content-Length' in headers
+ body = await reader.read(int(headers['Content-Length']))
+ return headers, body
+
+
async def assert_user_agent(user_agent):
products = user_agent.split(' ')
assert len(products) == 2
allow_root = ' --allow-root' if running_as_root else ''
proc = await asyncio.create_subprocess_shell(f'znc -d test/fixtures --foreground --debug{allow_root}')
- time.sleep(31 if running_as_root else 1)
+ await asyncio.sleep(31 if running_as_root else 1)
(reader, writer) = await asyncio.open_connection('localhost', 6698)
writer.write(b'CAP LS 302\r\n')
os.remove(config)
-async def test_registering_device():
- (proc, reader, writer) = await setUp()
+@pytest_asyncio.fixture
+async def znc():
+ proc, reader, writer = await setUp()
+ yield (reader, writer)
+ await tearDown(proc)
+
+
+async def test_registering_device(znc):
+ reader, writer = znc
writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
await writer.drain()
writer.write(b'PALAVER END\r\n')
await writer.drain()
- await tearDown(proc)
-
async def test_loading_module_new_cap():
await requires_znc_version('1.7.0')
await tearDown(proc)
-async def test_receiving_notification():
- (proc, reader, writer) = await setUp()
+async def test_receiving_notification(znc):
+ reader, writer = znc
async def connected(reader, writer):
- line = await reader.readline()
- assert line == b'POST /push HTTP/1.1\r\n'
-
- headers = await read_headers(reader)
- assert headers['Host'] == '127.0.0.1'
+ headers, body = await read_push_request(reader)
assert headers['Authorization'] == 'Bearer 9167e47b01598af7423e2ecd3d0a3ec4'
- assert headers['Connection'] == 'close'
- assert headers['Content-Length'] == '109'
- assert headers['Content-Type'] == 'application/json'
- await assert_user_agent(headers['User-Agent'])
-
- line = await reader.readline()
- assert json.loads(line.decode('utf-8')) == {
+ assert json.loads(body.decode('utf-8')) == {
'badge': 1,
'message': 'Test notification',
'sender': 'palaver',
server = await asyncio.start_server(connected, host='127.0.0.1', port=0)
await asyncio.sleep(0.2)
addr = server.sockets[0].getsockname()
- url = f'Serving on http://{addr[0]}:{addr[1]}/push'
+ url = f'http://{addr[0]}:{addr[1]}/push'
writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
await writer.drain()
server.close()
await server.wait_closed()
- await tearDown(proc)
-
assert connected.called
-async def test_receiving_notification_with_push_token():
- (proc, reader, writer) = await setUp()
-
+async def test_receiving_notification_with_push_token(znc):
async def connected(reader, writer):
- line = await reader.readline()
- assert line == b'POST /push HTTP/1.1\r\n'
-
- headers = await read_headers(reader)
- assert headers['Host'] == '127.0.0.1'
+ headers, body = await read_push_request(reader)
assert headers['Authorization'] == 'Bearer abcdefg'
- assert headers['Connection'] == 'close'
- assert headers['Content-Length'] == '109'
- assert headers['Content-Type'] == 'application/json'
- await assert_user_agent(headers['User-Agent'])
-
- line = await reader.readline()
- assert json.loads(line.decode('utf-8')) == {
+ assert json.loads(body.decode('utf-8')) == {
'badge': 1,
'message': 'Test notification',
'sender': 'palaver',
server = await asyncio.start_server(connected, host='127.0.0.1', port=0)
await asyncio.sleep(0.2)
addr = server.sockets[0].getsockname()
- url = f'Serving on http://{addr[0]}:{addr[1]}/push'
+ url = f'http://{addr[0]}:{addr[1]}/push'
+ reader, writer = znc
writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
await writer.drain()
server.close()
await server.wait_closed()
- await tearDown(proc)
-
assert connected.called
+
+
+async def test_receiving_notification_with_retry_on_server_error(znc):
+ reader, writer = znc
+
+ async def connected(reader, writer):
+ headers, body = await read_push_request(reader)
+ assert headers['Authorization'] == 'Bearer abcdefg'
+ assert json.loads(body.decode('utf-8')) == {
+ 'badge': 1,
+ 'message': 'Test notification',
+ 'sender': 'palaver',
+ 'network': 'b758eaab1a4611a310642a6e8419fbff'
+ }
+
+ if not hasattr(connected, 'requests'):
+ connected.requests = 1
+ writer.write(b'HTTP/1.1 503 Service Unavailable\r\n')
+ else:
+ connected.requests += 1
+ writer.write(b'HTTP/1.1 204 No Content\r\n')
+
+ writer.write(b'Connection: close\r\n')
+ writer.write(b'\r\n')
+
+ await writer.drain()
+ writer.close()
+
+ server = await asyncio.start_server(connected, host='127.0.0.1', port=8121)
+ await asyncio.sleep(0.2)
+ addr = server.sockets[0].getsockname()
+ url = f'http://{addr[0]}:{addr[1]}/push'
+
+ writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
+ await writer.drain()
+
+ line = await reader.readline()
+ assert line == b'PALAVER REQ *\r\n'
+
+ writer.write(b'PALAVER BEGIN 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e\r\n')
+ writer.write(f'PALAVER SET PUSH-ENDPOINT {url}\r\n'.encode('utf-8'))
+ writer.write(f'PALAVER SET PUSH-TOKEN abcdefg\r\n'.encode('utf-8'))
+ writer.write(b'PALAVER END\r\n')
+ await writer.drain()
+
+ writer.write(b'PRIVMSG *palaver :test\r\n')
+ await writer.drain()
+
+ line = await reader.readline()
+ assert line == b':*palaver!znc@znc.in PRIVMSG admin :Notification sent to 1 clients.\r\n'
+
+ await asyncio.sleep(1.2)
+ server.close()
+ await server.wait_closed()
+
+ assert connected.requests == 2