To create a virtual environment using Python’s built-in venv:
python3 -m venv venv
source venv/bin/activate
Once activated, install Flask:
pip install Flask
A minimal Flask application can be defined in app.py:
from flask import Flask
app = Flask(__name__)
@app.route('/')
def home():
return '<p>Hello, World!</p>'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Set the application entry point and run:
export FLASK_APP=app.py
flask run
For remote access, bind to all interfaces:
flask run --host=0.0.0.0
Production Deployment with WSGI Servers
Flask applications are WSGI-compliant and can be deployed behind production-grade servers such as Gunicorn or uWSGI for better performance and stability:
pip install gunicorn
gunicorn --bind 0.0.0.0:5000 --workers 4 app:app
Secure CI/CD Integration with GitLab
Automating deployments via GitLab CI requires securing the webhook endpoint to prevent unauthorized triggers. Two key protections are essential:
- Token Authentciation — Validate an expected secret token sent in the request header.
- IP Whitelisting — Restrict access to known GitLab server IPs only.
Below is a secure Flask endpoint that validates both token and source IP before triggering a deployment:
from flask import Flask, request
import hashlib
import os
import shutil
import git
import configparser
import requests
import json
app = Flask(__name__)
# Security configuration
VALID_TOKEN = os.getenv('GITLAB_WEBHOOK_TOKEN')
VALID_IP = os.getenv('GITLAB_SERVER_IP')
# Webhook notification via WeChat Enterprise
WECHAT_WEBHOOK = os.getenv('WECHAT_ROBOT_WEBHOOK')
def send_wechat_notification(message):
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "text",
"text": {"content": message}
}
response = requests.post(WECHAT_WEBHOOK, headers=headers, json=payload)
if response.status_code != 200:
print("Failed to send WeChat notification")
@app.route('/', methods=['POST'])
def deploy():
# Validate token
if request.headers.get('X-Gitlab-Token') != VALID_TOKEN:
return 'Unauthorized', 403
# Validate source IP
if request.remote_addr != VALID_IP:
return 'Forbidden', 403
send_wechat_notification("Deployment triggered by GitLab")
# Define paths
REPO_DIR = '/opt/deploy/current'
PREV_REPO_DIR = '/opt/deploy/previous'
GIT_URL = os.getenv('REPO_URL')
DOT_GIT_PATH = os.path.join(REPO_DIR, '.git')
# Clean previous deployment
if os.path.exists(PREV_REPO_DIR):
shutil.rmtree(PREV_REPO_DIR)
if os.path.exists(REPO_DIR):
os.rename(REPO_DIR, PREV_REPO_DIR)
# Clone new code
git.Repo.clone_from(GIT_URL, REPO_DIR)
# Remove .git directory for security
if os.path.exists(DOT_GIT_PATH):
shutil.rmtree(DOT_GIT_PATH)
# Load host configurations
def load_hosts_config(path):
config = configparser.ConfigParser(allow_no_value=True)
config.read(path)
return config
current_hosts = load_hosts_config(os.path.join(REPO_DIR, 'hosts'))
previous_hosts = load_hosts_config(os.path.join(PREV_REPO_DIR, 'hosts'))
# Determine engine types
LINUX_ENGINES = ['arcabit', 'antivir', 'clamav', 'cyren', 'drweb', 'ikarus', 'k7', 'fprot', 'fortinet', 'fsecure', 'mcafee', 'trend', 'virusbuster', 'vba']
WINDOWS_ENGINES = ['ahnlab', 'alyac', 'gridinsoft', 'tws', 'systweak', 'tachyon', 'gdata', 'panda', 'kingsoft', 'baidu', 'emsisoft', '360', 'comodo', 'quickheal', 'meta', 'xvirus', 'jiangmin', 'antiy', 'sunbelt', 'avast', 'nano', 'sangfor', 'rising']
# Compare and remove obsolete configurations
def remove_old_configs(current, prev):
for section in prev.sections():
if section not in current.sections():
continue
prev_ips = set(prev.items(section))
curr_ips = set(current.items(section))
removed_ips = prev_ips - curr_ips
if not removed_ips:
continue
ip_list = [ip[0] for ip in removed_ips]
engine_name = section
if engine_name in LINUX_ENGINES:
# Stop and remove systemd service and config
stop_engine(engine_name, ip_list)
remove_config(engine_name, ip_list, 'linux')
remove_service(engine_name, ip_list)
elif engine_name in WINDOWS_ENGINES:
# Stop and remove Windows config
stop_windows_engine(engine_name, ip_list)
remove_windows_config(engine_name, ip_list)
# Deploy new configurations
def deploy_new_configs(current, prev):
for section in current.sections():
if section in prev.sections():
prev_ips = set(prev.items(section))
curr_ips = set(current.items(section))
added_ips = curr_ips - prev_ips
if added_ips:
ip_list = [ip[0] for ip in added_ips]
deploy_to_hosts(section, ip_list)
else:
# New engine group
deploy_to_hosts(section, [])
def deploy_to_hosts(engine, ip_list):
if engine in LINUX_ENGINES:
deploy_linux_engine(engine, ip_list)
elif engine in WINDOWS_ENGINES:
deploy_windows_engine(engine, ip_list)
def deploy_linux_engine(engine, ip_list):
# Deploy binary
bin_path = os.path.join(REPO_DIR, 'bin', 'csscand')
dest_path = '/opt/cloudscan/gobin/csscand'
copy_binary(bin_path, dest_path, ip_list)
chown_app_user(ip_list)
# Deploy config
config_path = os.path.join(REPO_DIR, 'conf', 'conf', f'csscand-{engine}.yaml')
deploy_config(config_path, f'csscand-{engine}.yaml', ip_list)
# Deploy systemd service
service_path = os.path.join(REPO_DIR, 'systemdService', f'cs{engine}.service')
deploy_service(service_path, f'cs{engine}.service', ip_list)
reload_systemd(ip_list)
enable_service(f'cs{engine}.service', ip_list)
restart_service(f'cs{engine}.service', ip_list)
def deploy_windows_engine(engine, ip_list):
# Deploy Windows binary and config
bin_path = os.path.join(REPO_DIR, 'bin', 'csscand.exe')
dest_path = 'D:\\v0.0.0\\app\\csscand.exe'
copy_windows_binary(bin_path, dest_path, engine)
config_path = os.path.join(REPO_DIR, 'conf', 'conf', f'csscand-{engine}.yaml')
deploy_windows_config(config_path, f'csscand-{engine}.yaml', engine)
# Restart Windows service
restart_windows_service(engine)
def copy_binary(src, dest, targets=None):
cmd = f"ansible {' '.join(targets) if targets else 'linux'} -m copy -a 'src={src} dest={dest} mode=0755' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def copy_windows_binary(src, dest, engine):
cmd = f"ansible {engine} -m win_copy -a 'src={src} dest={dest}'"
os.system(cmd)
def deploy_config(src, dest, targets=None):
path = f'/opt/cloudscan/conf/conf/{dest}'
cmd = f"ansible {' '.join(targets) if targets else 'linux'} -m copy -a 'src={src} dest={path}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def deploy_windows_config(src, dest, engine):
path = f'D:\\\\v0.0.0\\\\conf\\\\{dest}'
cmd = f"ansible {engine} -m win_copy -a 'src={src} dest={path}'"
os.system(cmd)
def deploy_service(src, dest, targets=None):
path = f'/usr/lib/systemd/system/{dest}'
cmd = f"ansible {' '.join(targets) if targets else 'linux'} -m copy -a 'src={src} dest={path}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def reload_systemd(targets=None):
cmd = f"ansible {' '.join(targets) if targets else 'linux'} -m shell -a 'systemctl daemon-reload' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def enable_service(service_name, targets=None):
cmd = f"ansible {' '.join(targets) if targets else 'linux'} -m shell -a 'systemctl enable {service_name}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def restart_service(service_name, targets=None):
cmd = f"ansible {' '.join(targets) if targets else 'linux'} -m shell -a 'systemctl restart {service_name}; systemctl status {service_name}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def stop_engine(engine, ip_list):
cmd = f"ansible {' '.join(ip_list)} -m shell -a 'systemctl stop cs{engine}; systemctl status cs{engine}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def remove_config(engine, ip_list, os_type):
path = f'/opt/cloudscan/conf/conf/csscand-{engine}.yaml'
cmd = f"ansible {' '.join(ip_list)} -m shell -a 'rm -f {path}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def remove_service(service_name, ip_list):
path = f'/usr/lib/systemd/system/{service_name}'
cmd = f"ansible {' '.join(ip_list)} -m shell -a 'rm -f {path}; systemctl daemon-reload; systemctl disable {service_name}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
def stop_windows_engine(engine, ip_list):
cmd = f"ansible {engine} -m win_shell -a 'd:\\\\v0.0.0\\\\supervisord.exe ctl stop {engine} /u csscand /P g575JwqwJkLDBpbq'"
os.system(cmd)
def remove_windows_config(engine, ip_list):
path = f'D:\\\\v0.0.0\\\\conf\\\\csscand-{engine}.yaml'
cmd = f"ansible {engine} -m win_shell -a 'Remove-Item -Path \"{path}\"'"
os.system(cmd)
def restart_windows_service(engine):
cmd = f"ansible {engine} -m win_shell -a 'd:\\\\v0.0.0\\\\supervisord.exe ctl start {engine} /u csscand /P g575JwqwJkLDBpbq'"
os.system(cmd)
def chown_app_user(ip_list):
cmd = f"ansible {' '.join(ip_list)} -m shell -a 'chown -R app:app /opt/cloudscan' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root"
os.system(cmd)
# Execute deployment steps
remove_old_configs(current_hosts, previous_hosts)
shutil.copy(os.path.join(REPO_DIR, 'hosts'), '/etc/ansible/hosts')
deploy_new_configs(current_hosts, previous_hosts)
deploy_conf_files(REPO_DIR, PREV_REPO_DIR)
deploy_systemd_services(REPO_DIR, PREV_REPO_DIR)
deploy_binaries(REPO_DIR, PREV_REPO_DIR)
send_wechat_notification("Deployment completed successfully")
return 'Deployment successful', 200
def deploy_conf_files(repo_dir, prev_dir):
conf_dir = os.path.join(repo_dir, 'conf')
prev_conf_dir = os.path.join(prev_dir, 'conf')
for conf_file in os.listdir(conf_dir):
if conf_file == 'conf' or conf_file == 'supervisord.conf':
continue
src = os.path.join(conf_dir, conf_file)
dst = os.path.join('/opt/cloudscan/conf', conf_file)
if os.path.exists(os.path.join(prev_conf_dir, conf_file)):
with open(src, 'rb') as f:
new_md5 = hashlib.md5(f.read()).hexdigest()
with open(os.path.join(prev_conf_dir, conf_file), 'rb') as f:
old_md5 = hashlib.md5(f.read()).hexdigest()
if new_md5 == old_md5:
continue
# Deploy only if changed or new
os.system(f"ansible linux -m copy -a 'src={src} dest={dst}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root")
def deploy_systemd_services(repo_dir, prev_dir):
service_dir = os.path.join(repo_dir, 'systemdService')
prev_service_dir = os.path.join(prev_dir, 'systemdService')
for svc_file in os.listdir(service_dir):
if not svc_file.endswith('.service'):
continue
src = os.path.join(service_dir, svc_file)
dst = f'/usr/lib/systemd/system/{svc_file}'
if os.path.exists(os.path.join(prev_service_dir, svc_file)):
with open(src, 'rb') as f:
new_md5 = hashlib.md5(f.read()).hexdigest()
with open(os.path.join(prev_service_dir, svc_file), 'rb') as f:
old_md5 = hashlib.md5(f.read()).hexdigest()
if new_md5 == old_md5:
continue
os.system(f"ansible linux -m copy -a 'src={src} dest={dst}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root")
os.system(f"ansible linux -m shell -a 'systemctl daemon-reload; systemctl enable {svc_file}; systemctl restart {svc_file}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root")
def deploy_binaries(repo_dir, prev_dir):
bin_dir = os.path.join(repo_dir, 'bin')
prev_bin_dir = os.path.join(prev_dir, 'bin')
for bin_file in os.listdir(bin_dir):
if bin_file in ('csscand', 'csscand.exe'):
continue
src = os.path.join(bin_dir, bin_file)
dst = f'/opt/cloudscan/gobin/{bin_file}'
if os.path.exists(os.path.join(prev_bin_dir, bin_file)):
with open(src, 'rb') as f:
new_md5 = hashlib.md5(f.read()).hexdigest()
with open(os.path.join(prev_bin_dir, bin_file), 'rb') as f:
old_md5 = hashlib.md5(f.read()).hexdigest()
if new_md5 == old_md5:
continue
os.system(f"ansible linux -m copy -a 'src={src} dest={dst} mode=0755' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root")
os.system(f"ansible linux -m shell -a 'systemctl restart cs{bin_file[:-4]}' --private-key /opt/deploy/.ssh/id_rsa --become --become-user=root")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
This deployment script integrates with GitLab via secure webhooks, performs atomic updates by renaming directories, compares file hashes to avoid redundant deployments, and orchestrates cross-platform (Linux/Windows) configuration and service management using Ansible. The endpoint only executes when both token and IP match preconfigured values, minimizing exposure to unauthorized triggers.