2024-10-04 16:50:23 +00:00
import http . client
import datetime
import argparse
import json
import specs
import base64
import time
import math
import sys
import re
import os
import socket
import asyncio
from urllib . parse import urlparse
import subprocess
from functools import partial
class logger :
RED = ' \033 [91m '
GREEN = ' \033 [92m '
BLUE = ' \033 [94m '
RESET = ' \033 [0m '
@staticmethod
def _get_current_time ( ) :
return datetime . datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M: % S " )
@staticmethod
def error ( message ) :
print ( f " { logger . RED } { logger . _get_current_time ( ) } | ERROR | { message } { logger . RESET } " )
@staticmethod
def success ( message ) :
print ( f " { logger . GREEN } { logger . _get_current_time ( ) } | SUCCESS | { message } { logger . RESET } " )
@staticmethod
def info ( message ) :
print ( f " { logger . BLUE } { logger . _get_current_time ( ) } | INFO | { message } { logger . RESET } " )
if os . geteuid ( ) != 0 :
logger . error ( " This script must be run as root! " )
sys . exit ( 1 )
parser = argparse . ArgumentParser ( description = " Script with --clore-endpoint flag. " )
parser . add_argument ( ' --clore-endpoint ' , type = str , default = " https://api.clore.ai/machine_onboarding " , help = ' Specify the Clore API endpoint. Default is " https://api.clore.ai/machine_onboarding " . ' )
parser . add_argument ( ' --mock ' , action = ' store_true ' , help = ' Report predefined machine specs (testing only) ' )
parser . add_argument ( ' --mode ' , type = str , default = " linux " )
parser . add_argument ( ' --write-linux-config ' , type = str , default = " " )
parser . add_argument ( ' --linux-hostname-override ' , default = " " )
parser . add_argument ( ' --auth-file ' , type = str , default = " /opt/clore-hosting/client/auth " , help = ' Auth file location ' )
args = parser . parse_args ( )
hive_hive_wallet_conf_path = " mock/wallet.conf " if args . mock else " /hive-config/wallet.conf "
hive_rig_conf_path = " mock/rig.conf " if args . mock else " /hive-config/rig.conf "
hive_oc_conf_path = " mock/oc.conf " if args . mock else " /hive-config/nvidia-oc.conf "
clore_conf_path = " mock/onboarding.json " if args . mock else " /opt/clore-hosting/onboarding.json "
async def run_command ( command : str ) :
loop = asyncio . get_running_loop ( )
bash_command = [ ' /bin/bash ' , ' -c ' , command ]
run_subprocess = partial ( subprocess . run , bash_command , stdout = subprocess . PIPE , stderr = subprocess . PIPE , text = True )
process = await loop . run_in_executor ( None , run_subprocess )
return process . stdout , process . stderr
def clean_config_value ( value ) :
if ' # ' in value :
value = value . split ( ' # ' , 1 ) [ 0 ]
value = value . strip ( )
if ( len ( value ) > = 2 and value [ - 1 ] == " ' " and value [ 0 ] == " ' " ) or ( len ( value ) > = 2 and value [ - 1 ] == ' " ' and value [ 0 ] == ' " ' ) :
value = value [ 1 : - 1 ]
return value
def filter_name ( name ) :
return re . sub ( r ' [^A-Za-z0-9_-] ' , ' ' , name )
def validate_clore_config ( clore_config ) :
def is_valid_hostname_override ( value ) :
return isinstance ( value , str ) and 1 < = len ( value ) < = 64 and re . match ( r ' ^[A-Za-z0-9_-]+$ ' , value )
def is_valid_auth ( value ) :
return isinstance ( value , str ) and len ( value ) < = 256
def is_valid_multipliers ( value ) :
return isinstance ( value , dict ) and " on_demand_multiplier " in value and " spot_multiplier " in value and \
1 < = value [ " on_demand_multiplier " ] < = 50 and 1 < = value [ " spot_multiplier " ] < = 50
def is_valid_mrl ( value ) :
return isinstance ( value , int ) and 6 < = value < = 1440
def is_valid_keep_params ( value ) :
return isinstance ( value , bool )
def is_valid_pricing ( clore_config ) :
required_keys = { " on_demand_bitcoin " , " on_demand_clore " , " spot_bitcoin " , " spot_clore " }
if required_keys . issubset ( clore_config ) :
return 0.000001 < = clore_config [ " on_demand_bitcoin " ] < = 0.005 and \
0.1 < = clore_config [ " on_demand_clore " ] < = 5000 and \
0.000001 < = clore_config [ " spot_bitcoin " ] < = 0.005 and \
0.1 < = clore_config [ " spot_clore " ] < = 5000
return False
def is_valid_usd_pricing ( autoprice ) :
required_keys = { " on_demand " , " spot " }
if required_keys . issubset ( autoprice ) :
return 0.1 < = autoprice [ " spot " ] < = 1000 and \
0.1 < = autoprice [ " on_demand " ] < = 1000
return False
errors = [ ]
if " hostname_override " in clore_config and not is_valid_hostname_override ( clore_config [ " hostname_override " ] ) :
errors . append ( " hostname_override must be a string between 1-64 characters, only A-Za-z0-9_- allowed " )
if " auth " not in clore_config or not is_valid_auth ( clore_config [ " auth " ] ) :
errors . append ( " auth is mandatory and must be a string of max 256 character " )
if " autoprice " in clore_config and isinstance ( clore_config [ " autoprice " ] , dict ) :
if clore_config [ " autoprice " ] . get ( " usd " ) :
if not is_valid_usd_pricing ( clore_config [ " autoprice " ] ) :
errors . append ( " usd pricing input is invalid " )
elif not is_valid_multipliers ( clore_config [ " autoprice " ] ) :
errors . append ( " multipliers are not following spec " )
if " mrl " not in clore_config or not is_valid_mrl ( clore_config [ " mrl " ] ) :
errors . append ( " mrl is mandatory and must be an integer in range 6-1440 " )
if " keep_params " in clore_config and not is_valid_keep_params ( clore_config [ " keep_params " ] ) :
errors . append ( " keep_params must be a boolean value " )
crypto_keys = { " on_demand_bitcoin " , " on_demand_clore " , " spot_bitcoin " , " spot_clore " }
if any ( key in clore_config for key in crypto_keys ) :
if not is_valid_pricing ( clore_config ) :
errors . append ( " All pricing fields (on_demand_bitcoin, on_demand_clore, spot_bitcoin, spot_clore) must be specified and valid. " )
return errors if errors else " Validation successful "
def base64_string_to_json ( base64_string ) :
try :
padding_needed = len ( base64_string ) % 4
if padding_needed :
base64_string + = ' = ' * ( 4 - padding_needed )
json_bytes = base64 . b64decode ( base64_string )
json_str = json_bytes . decode ( ' utf-8 ' )
json_obj = json . loads ( json_str )
return json_obj
except Exception as e :
return None
def get_default_power_limits ( ) :
try :
cmd = " nvidia-smi -q -d POWER | grep \" Default Power Limit \" | awk ' { print $5} ' "
result = subprocess . run ( cmd , shell = True , check = True , capture_output = True , text = True )
lines = result . stdout . strip ( ) . split ( ' \n ' )
power_limits = [ ]
for line in lines :
if line . lower ( ) != ' n/a ' :
try :
power_limits . append ( int ( float ( line ) ) )
except ValueError :
continue
return power_limits if power_limits else None
except subprocess . CalledProcessError :
return None
except Exception :
return None
def validate_and_convert ( input_str , min_val , max_val , adjust_bounds = False ) :
try :
int_list = [ int ( x ) for x in input_str . split ( ) ]
if adjust_bounds :
int_list = [ max ( min ( num , max_val ) , min_val ) for num in int_list ]
else :
if not all ( min_val < = num < = max_val for num in int_list ) :
return None
return int_list
except Exception :
return None
def get_number_or_last ( numbers , index ) :
if index < len ( numbers ) :
return numbers [ index ]
else :
return numbers [ - 1 ]
async def async_read_file ( path ) :
try :
with open ( path , ' r ' ) as file :
return file . read ( )
except Exception as e :
#print(f"Error reading file {path}: {e}")
return None
2024-10-31 02:21:49 +00:00
def extract_last_setcore_setmem ( input_string ) :
try :
setcore_pattern = r ' --setcore \ s+((?: \ d+ \ s*)+)(?= \ D|$) '
setmem_pattern = r ' --setmem \ s+((?: \ d+ \ s*)+)(?= \ D|$) '
setcore_matches = re . findall ( setcore_pattern , input_string )
setmem_matches = re . findall ( setmem_pattern , input_string )
last_setcore = [ int ( num ) for num in setcore_matches [ - 1 ] . split ( ) ] if setcore_matches else [ ]
last_setmem = [ int ( num ) for num in setmem_matches [ - 1 ] . split ( ) ] if setmem_matches else [ ]
return last_setcore , last_setmem
except Exception as e :
return [ [ ] , [ ] ]
2024-10-04 16:50:23 +00:00
async def hive_parse_config ( file_content ) :
conf = { }
if file_content :
for line in file_content . split ( ' \n ' ) :
line = line . strip ( )
if line [ : 1 ] != " # " and ' = ' in line :
key , value = [ line . split ( ' = ' , 1 ) [ 0 ] , clean_config_value ( line . split ( ' = ' , 1 ) [ 1 ] ) ]
conf [ key ] = value
return conf
async def hive_load_configs ( default_power_limits , static_config ) :
parsed_static_config = None
try :
parsed_static_config = json . loads ( static_config )
except Exception :
pass
try :
# Non-blocking file reads
wallet_conf_content = await async_read_file ( hive_hive_wallet_conf_path )
rig_conf_content = await async_read_file ( hive_rig_conf_path )
# Parse rig config
rig_conf = await hive_parse_config ( rig_conf_content )
if not rig_conf or " WORKER_NAME " not in rig_conf or " RIG_ID " not in rig_conf :
print ( " WORKER_NAME or RIG_ID is missing from rig config " )
os . _exit ( 1 )
clore_miner_present = False
# Parse wallet config
clore_config = None
get_oc_config = False
2024-10-31 02:21:49 +00:00
fs_mem_lock = [ ]
fs_core_lock = [ ]
2024-10-04 16:50:23 +00:00
if wallet_conf_content :
2024-10-31 02:21:49 +00:00
fs_core_lock , fs_mem_lock = extract_last_setcore_setmem ( wallet_conf_content )
2024-10-04 16:50:23 +00:00
for wallet_conf_line in wallet_conf_content . split ( ' \n ' ) :
wallet_conf_line = wallet_conf_line . strip ( )
if wallet_conf_line [ : 1 ] != " # " and ' = ' in wallet_conf_line :
key , value = [ wallet_conf_line . split ( ' = ' , 1 ) [ 0 ] , clean_config_value ( wallet_conf_line . split ( ' = ' , 1 ) [ 1 ] ) ]
if key [ - 9 : ] == " _TEMPLATE " :
possible_clore_config = base64_string_to_json ( value )
if possible_clore_config :
clore_config = possible_clore_config
elif key == " CUSTOM_MINER " and value == " clore " :
clore_miner_present = True
if ( not clore_miner_present or not clore_config ) and parsed_static_config :
clore_miner_present = True
clore_config = parsed_static_config
2024-10-31 02:56:42 +00:00
try :
if clore_config and " set_stock_oc " in clore_config :
get_oc_config = True
except Exception as es :
pass
2024-10-04 16:50:23 +00:00
if not clore_miner_present :
logger . info ( " CLORE not found in flighsheet, exiting " )
await run_command ( " systemctl disable clore-hosting.service ; systemctl stop clore-hosting.service ; systemctl disable docker ; systemctl stop docker ; systemctl disable clore-onboarding.service ; systemctl stop clore-onboarding.service " )
sys . exit ( 0 )
out_oc_config = { }
if get_oc_config and default_power_limits :
nvidia_oc = await async_read_file ( hive_oc_conf_path )
gpu_cnt = len ( default_power_limits )
if nvidia_oc and gpu_cnt > 0 :
try :
core_offset = None
mem_offset = None
2024-10-31 02:21:49 +00:00
core_lock = None
mem_lock = None
2024-10-04 16:50:23 +00:00
pl_static = None
for nvidia_conf_line in nvidia_oc . split ( ' \n ' ) :
nvidia_conf_line = nvidia_conf_line . strip ( )
if nvidia_conf_line [ : 1 ] != " # " and ' = ' in nvidia_conf_line :
key , value = [ nvidia_conf_line . split ( ' = ' , 1 ) [ 0 ] , clean_config_value ( nvidia_conf_line . split ( ' = ' , 1 ) [ 1 ] ) ]
if value == " " :
pass
elif key == " CLOCK " :
core_offset = [ math . floor ( num / 2 ) for num in validate_and_convert ( value , - 2000 , 2000 , adjust_bounds = True ) ]
elif key == " MEM " :
mem_offset = [ math . floor ( num / 2 ) for num in validate_and_convert ( value , - 2000 , 6000 , adjust_bounds = True ) ]
elif key == " PLIMIT " :
pl_static = validate_and_convert ( value , 1 , 1500 , adjust_bounds = True )
2024-10-31 02:21:49 +00:00
elif key == " LCLOCK " :
core_lock = validate_and_convert ( value , 0 , 12000 , adjust_bounds = True )
elif key == " LMEM " :
mem_lock = validate_and_convert ( value , 0 , 32000 , adjust_bounds = True )
if len ( fs_mem_lock ) > 0 :
mem_lock = fs_mem_lock
if len ( fs_core_lock ) > 0 :
core_lock = fs_core_lock
2024-10-04 16:50:23 +00:00
2024-10-31 02:21:49 +00:00
#print(mem_lock, core_lock)
if core_offset or mem_offset or pl_static or mem_lock or core_lock :
2024-10-04 16:50:23 +00:00
for gpu_idx , default_pl in enumerate ( default_power_limits ) :
out_oc_config [ str ( gpu_idx ) ] = {
" core " : get_number_or_last ( core_offset , gpu_idx ) if core_offset else 0 ,
" mem " : get_number_or_last ( mem_offset , gpu_idx ) if mem_offset else 0 ,
" pl " : get_number_or_last ( pl_static , gpu_idx ) if pl_static else default_pl
}
2024-10-31 02:21:49 +00:00
if type ( core_lock ) == list and len ( core_lock ) > 0 :
out_oc_config [ str ( gpu_idx ) ] [ " core_lock " ] = get_number_or_last ( core_lock , gpu_idx )
if type ( mem_lock ) == list and len ( mem_lock ) > 0 :
out_oc_config [ str ( gpu_idx ) ] [ " mem_lock " ] = get_number_or_last ( mem_lock , gpu_idx )
2024-10-04 16:50:23 +00:00
except Exception as oc_info_e :
pass
# Construct machine name
machine_name = f " { filter_name ( rig_conf [ ' WORKER_NAME ' ] [ : 32 ] ) } _HIVE_ { rig_conf [ ' RIG_ID ' ] } "
return machine_name , clore_config , out_oc_config
except Exception as e :
logger . error ( f " Can ' t load rig.conf, wallet.conf | { e } " )
async def post_request ( url , body , headers = None , timeout = 15 ) :
parsed_url = urlparse ( url )
if parsed_url . scheme == ' https ' :
conn = http . client . HTTPSConnection ( parsed_url . hostname , parsed_url . port or 443 , timeout = timeout )
elif parsed_url . scheme == ' http ' :
conn = http . client . HTTPConnection ( parsed_url . hostname , parsed_url . port or 80 , timeout = timeout )
else :
raise ValueError ( f " Unsupported URL scheme: { parsed_url . scheme } " )
json_data = json . dumps ( body )
if headers is None :
headers = { }
headers [ ' Content-Type ' ] = ' application/json '
path = parsed_url . path
if parsed_url . query :
path + = ' ? ' + parsed_url . query
try :
conn . request ( " POST " , path , body = json_data , headers = headers )
response = conn . getresponse ( )
response_data = response . read ( ) . decode ( ' utf-8 ' )
try :
parsed_response_data = json . loads ( response_data )
response_data = parsed_response_data
except Exception as ep :
pass
status_code = response . status
#if 200 <= status_code < 300:
# print(f"Request was successful: {status_code}")
#else:
# print(f"Non-standard status code received: {status_code}")
return status_code , response_data
except ( http . client . HTTPException , TimeoutError ) as e :
print ( f " Request failed: { e } " )
return None , None
finally :
conn . close ( )
def clean_clore_config ( clore_config ) :
clore_config_copy = clore_config . copy ( )
if " auth " in clore_config_copy :
if " auth " in clore_config_copy :
del clore_config_copy [ " auth " ]
if " hostname_override " in clore_config_copy :
del clore_config_copy [ " hostname_override " ]
if " set_stock_oc " in clore_config_copy :
del clore_config_copy [ " set_stock_oc " ]
if " save_config " in clore_config_copy :
del clore_config_copy [ " save_config " ]
return clore_config_copy
def verify_or_update_file ( file_path : str , expected_content : str ) - > bool :
try :
if os . path . exists ( file_path ) :
with open ( file_path , ' r ' , encoding = ' utf-8 ' ) as file :
current_content = file . read ( )
if current_content == expected_content :
return True
with open ( file_path , ' w ' , encoding = ' utf-8 ' ) as file :
file . write ( expected_content )
return False
except Exception :
return True
def get_machine_id ( ) :
machine_id_path = " /etc/machine-id "
if os . path . isfile ( machine_id_path ) :
with open ( machine_id_path , " r " ) as file :
return file . read ( ) . strip ( )
return None
next_retry_reached_server_limit = 0
if args . write_linux_config :
linux_config = base64_string_to_json ( args . write_linux_config )
if linux_config :
if args . linux_hostname_override :
if 1 < = len ( args . linux_hostname_override ) < = 64 and re . match ( r ' ^[A-Za-z0-9_-]+$ ' , args . linux_hostname_override ) :
linux_config [ " hostname_override " ] = args . linux_hostname_override
else :
logger . error ( " Input hostname not valid " )
sys . exit ( 1 )
verify_or_update_file ( clore_conf_path , json . dumps ( linux_config ) )
logger . success ( " Config written " )
sys . exit ( 0 )
else :
logger . error ( " Invalid config " )
sys . exit ( 1 )
async def main ( machine_specs ) :
last_used_config = None
ever_pending_creation = False
machine_id = get_machine_id ( )
default_power_limits = get_default_power_limits ( )
if not machine_id :
logger . error ( " Can ' t load machine ID " )
sys . exit ( 1 )
if not default_power_limits or len ( default_power_limits ) == 0 :
logger . error ( " Can ' t load default power limits of nVidia GPU(s) " )
sys . exit ( 1 )
oc_config = { }
while True :
try :
if args . mode == " linux " :
clore_config = await async_read_file ( clore_conf_path )
clore_config = json . loads ( clore_config )
machine_name = f " LINUX_ { machine_id } "
elif args . mode == " hive " :
static_clore_config = await async_read_file ( clore_conf_path )
machine_name , clore_config , oc_config = await hive_load_configs ( default_power_limits , static_clore_config )
#print(f"Machine Name: {machine_name}")
config_validation = validate_clore_config ( clore_config )
if config_validation == " Validation successful " :
if " save_config " in clore_config and args . mode == " hive " :
verify_or_update_file ( clore_conf_path , json . dumps ( clore_config ) )
if " set_stock_oc " in clore_config :
if oc_config == { } :
clore_config [ " clear_oc_override " ] = True
else :
clore_config [ " stock_oc_override " ] = oc_config
if clore_config != last_used_config or ( time . time ( ) > next_retry_reached_server_limit and next_retry_reached_server_limit > 0 ) :
last_used_config = clore_config . copy ( )
if type ( clore_config ) == dict and " hostname_override " in clore_config :
machine_name = clore_config [ " hostname_override " ]
clore_config [ " name " ] = machine_name
clore_config [ " specs " ] = machine_specs
status_code , response_data = await post_request (
args . clore_endpoint ,
clean_clore_config ( clore_config ) ,
{ " auth " : clore_config [ " auth " ] } ,
15
)
next_retry_reached_server_limit = 0
if type ( response_data ) == dict : # Response data seem to be correct format
if response_data . get ( " status " ) == " invalid_auth " :
logger . error ( " Invalid auth token " )
elif response_data . get ( " error " ) == " exceeded_rate_limit " :
logger . error ( " Exceeded API limits, probably a lot of servers on same network " )
logger . info ( " Retrying request in 65s " )
await asyncio . sleep ( 60 )
last_used_config = None
elif response_data . get ( " status " ) == " exceeded_limit " :
logger . error ( " Your account already has the maximal server limit, retrying in 12hr " )
next_retry_reached_server_limit = time . time ( ) + 60 * 60 * 12
elif response_data . get ( " status " ) == " creation_pending " :
logger . info ( " Machine creation is pending on clore.ai side " )
await asyncio . sleep ( 60 if ever_pending_creation else 10 )
ever_pending_creation = True
last_used_config = None
elif " init_communication_token " in response_data and " private_communication_token " :
clore_hosting_sw_auth_str = f " { response_data [ ' init_communication_token ' ] } : { response_data [ ' private_communication_token ' ] } "
was_ok = verify_or_update_file ( args . auth_file , clore_hosting_sw_auth_str )
if was_ok :
logger . info ( " Token for hosting software already configured " )
await run_command ( " systemctl start clore-hosting.service " )
else :
logger . success ( " Updated local auth file, restarting clore hosting " )
await run_command ( " systemctl restart clore-hosting.service " )
else :
logger . error ( " Unknown API response, retrying in 65s " )
await asyncio . sleep ( 60 )
last_used_config = None
else :
logger . error ( f " Could not parse config - { ' | ' . join ( config_validation ) } " )
except Exception as e :
print ( e )
await asyncio . sleep ( 5 )
if __name__ == " __main__ " :
machine_specs = specs . get ( benchmark_disk = True , mock = args . mock )
asyncio . run ( main ( machine_specs ) )