assert product2_version == await get_znc_version()
-async def setUp(event_loop):
+async def setUp():
running_as_root = os.getuid() == 0
allow_root = ' --allow-root' if running_as_root else ''
- proc = await asyncio.create_subprocess_shell(f'znc -d test/fixtures --foreground --debug{allow_root}', loop=event_loop)
+ proc = await asyncio.create_subprocess_shell(f'znc -d test/fixtures --foreground --debug{allow_root}')
time.sleep(31 if running_as_root else 1)
- (reader, writer) = await asyncio.open_connection('localhost', 6698, loop=event_loop)
+ (reader, writer) = await asyncio.open_connection('localhost', 6698)
writer.write(b'CAP LS 302\r\n')
line = await reader.readline()
os.remove(config)
-async def test_registering_device(event_loop):
- (proc, reader, writer) = await setUp(event_loop)
+async def test_registering_device():
+ (proc, reader, writer) = await setUp()
writer.write(b'PALAVER IDENTIFY 9167e47b01598af7423e2ecd3d0a3ec4 611d3a30a3d666fc491cdea0d2e1dd6e b758eaab1a4611a310642a6e8419fbff\r\n')
await writer.drain()
await tearDown(proc)
-async def test_loading_module_new_cap(event_loop):
+async def test_loading_module_new_cap():
await requires_znc_version('1.7.0')
- (proc, reader, writer) = await setUp(event_loop)
+ (proc, reader, writer) = await setUp()
writer.write(b'PRIVMSG *status :unloadmod palaver\r\n')
await writer.drain()
await tearDown(proc)
-async def test_unloading_module_del_cap(event_loop):
+async def test_unloading_module_del_cap():
await requires_znc_version('1.7.0')
- (proc, reader, writer) = await setUp(event_loop)
+ (proc, reader, writer) = await setUp()
writer.write(b'PRIVMSG *status :unloadmod palaver\r\n')
await writer.drain()
await tearDown(proc)
-async def test_receiving_notification(event_loop):
- (proc, reader, writer) = await setUp(event_loop)
+async def test_receiving_notification():
+ (proc, reader, writer) = await setUp()
async def connected(reader, writer):
line = await reader.readline()
connected.called = True
- server = await asyncio.start_server(connected, host='127.0.0.1', port=0, loop=event_loop)
+ server = await asyncio.start_server(connected, host='127.0.0.1', port=0)
await asyncio.sleep(0.2)
addr = server.sockets[0].getsockname()
url = f'Serving on http://{addr[0]}:{addr[1]}/push'
assert connected.called
-async def test_receiving_notification_with_push_token(event_loop):
- (proc, reader, writer) = await setUp(event_loop)
+async def test_receiving_notification_with_push_token():
+ (proc, reader, writer) = await setUp()
async def connected(reader, writer):
line = await reader.readline()
connected.called = True
- server = await asyncio.start_server(connected, host='127.0.0.1', port=0, loop=event_loop)
+ server = await asyncio.start_server(connected, host='127.0.0.1', port=0)
await asyncio.sleep(0.2)
addr = server.sockets[0].getsockname()
url = f'Serving on http://{addr[0]}:{addr[1]}/push'