Add bcrypt_test_db, secure_connection

This commit is contained in:
Roland Thomas Jr 2023-09-15 13:27:34 -04:00
parent 1b4ba81ab0
commit 0a64bb27ba
Signed by: roland
GPG Key ID: 7C3C2B085A4C2872
4 changed files with 277 additions and 0 deletions

73
bcrypt_test_db.py Executable file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""bcrypt_test.py
This module provides utilities for user authentication, including functions to
store hashed passwords, create new users, and authenticate existing users using
bcrypt for password hashing and a sqlite database to store user data.
"""
import bcrypt
import sqlite3
def initialize_database():
conn = sqlite3.connect("passwords_db.db")
c = conn.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS users
(username TEXT PRIMARY KEY, hashed_password TEXT)"""
)
conn.commit()
conn.close()
def store_password(password):
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
return hashed_password
def check_password(password, hashed_password):
return bcrypt.checkpw(password.encode("utf-8"), hashed_password)
def create_user(username, password):
hashed_password = store_password(password).decode("utf-8")
conn = sqlite3.connect("passwords_db.db")
c = conn.cursor()
try:
c.execute(
"INSERT INTO users (username, hashed_password) VALUES (?, ?)",
(username, hashed_password),
)
conn.commit()
except sqlite3.IntegrityError:
print("Username already exists")
finally:
conn.close()
def authenticate_user(username, password):
conn = sqlite3.connect("passwords_db.db")
c = conn.cursor()
c.execute(
"SELECT hashed_password FROM users WHERE username = ?", (username,)
)
row = c.fetchone()
conn.close()
if row:
hashed_password = row[0].encode("utf-8")
return check_password(password, hashed_password)
else:
return False
if __name__ == "__main__":
initialize_database()
create_user("user1", "mysecurepassword")
if authenticate_user("user1", "mysecurepassword"):
print("Authentication successful")
else:
print("Authentication failed")

View File

@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""crypto_server.py
----------------
This module contains utilities for generating and storing server keys and
certificates for the Pylandia Secure Email Initiative. It utilizes the
`cryptography` library to create a private key and a self-signed certificate,
which are then saved to specified paths.
Functions:
- parse_args() -> Namespace: Parses the command line arguments.
- generate_key_pair(storage_path: Path): Generates a key pair and a
certificate and writes them to the specified storage path.
"""
from argparse import ArgumentParser, Namespace
from datetime import datetime, timedelta
from pathlib import Path
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
def parse_args() -> Namespace:
"""Parses the command line arguments for the script.
Returns:
Namespace: A namespace object that holds the command line arguments as
attributes.
"""
parser = ArgumentParser(description="Pylandia Server Key Generator")
parser.add_argument(
"-p",
"--path",
default="server_keys",
help="Path to save private and public keys",
)
return parser.parse_args()
def generate_key_pair(storage_path: Path):
"""Generates a private key and a self-signed certificate and saves them to
the specified storage path.
Args:
storage_path (Path): The directory where the private key and certificate
files should be saved.
Returns:
None
"""
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=4096, backend=default_backend()
)
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, "PY"),
x509.NameAttribute(
NameOID.STATE_OR_PROVINCE_NAME, "Central Province"
),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Pylandia City"),
x509.NameAttribute(
NameOID.ORGANIZATION_NAME, "Pylandia Secure Email Initiative"
),
x509.NameAttribute(NameOID.COMMON_NAME, "securemail.pylandia"),
]
)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
critical=False,
)
.sign(private_key, hashes.SHA256(), default_backend())
)
if not storage_path.exists():
storage_path.mkdir(parents=True)
publickey_path = storage_path / "cert.pem"
with publickey_path.open("wb") as file:
file.write(cert.public_bytes(serialization.Encoding.PEM))
privatekey_path = storage_path / "key.pem"
with privatekey_path.open("wb") as file:
file.write(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
if __name__ == "__main__":
args = parse_args()
storage = args.path
full_storage_path = Path(storage)
generate_key_pair(full_storage_path)

View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""
A module to establish a non-verifying SSL connection with a server and to
send/receive messages through a secure socket.
Usage:
Run the script to start a communication session with the server at
"127.0.0.1" on port 9001. Input messages in the console to send to
the server and receive replies. Type a message with the content
"quit" to close the connection.
Functions:
main(server_address: str, server_port: int): Initiates a secure socket
communication session with the specified server.
"""
from socket import create_connection, socket
from ssl import SSLContext, PROTOCOL_TLS_CLIENT, CERT_NONE
def main(server_address: str, server_port: int):
context = SSLContext(PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = CERT_NONE
unsecure_socket: socket = create_connection(
(server_address, server_port)
)
secure_socket = context.wrap_socket(unsecure_socket) #, server_hostname="localhost")
while True:
message = input()
secure_socket.send(message.encode("utf-8"))
reply = secure_socket.recv(len(message))
if reply == b"quit":
break
print(reply.decode("utf-8"))
if __name__ == "__main__":
main("127.0.0.1", 9001)

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
"""
This module facilitates creating an asynchronous SSL server that echoes
received messages. The server operates using SSL certificates stored in
the "server_keys" directory.
Usage:
Run the script to initiate the server on "localhost" at port 9001.
It listens for incoming connections and echoes messages unless "quit"
is received, which terminates the connection.
Functions:
handle_client(reader: StreamReader, writer: StreamWriter) -> None:
Handles the communication with a connected client asynchronously.
main(address: str, port: int) -> None:
Sets up and starts the SSL server with the necessary certificates.
"""
import logging
from asyncio import StreamReader, StreamWriter, run, start_server
from pathlib import Path
from ssl import Purpose, create_default_context
async def handle_client(reader: StreamReader, writer: StreamWriter):
while True:
msg_bytes = await reader.read(10)
if msg_bytes == b"quit":
writer.write(b"quit")
await writer.drain()
break
writer.write(msg_bytes)
await writer.drain()
async def main(address: str, port: int):
storage_path: Path = Path("server_keys")
keyfile = storage_path / "key.pem"
certfile = storage_path / "cert.pem"
context = create_default_context(Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile, keyfile)
server = await start_server(handle_client, address, port, ssl=context)
service_socket = server.sockets[0].getsockname()
logging.info("Serving on %s", service_socket)
async with server:
await server.serve_forever()
if __name__ == "__main__":
run(main("localhost", 9001))