Non-incremental fixed index working

Signed-off-by: Tiziano Bacocco <tizbac2@gmail.com>
This commit is contained in:
Tiziano Bacocco 2025-10-12 00:31:41 +02:00
parent fab15205f7
commit c8c99b3c3c
10 changed files with 292 additions and 88 deletions

View File

@ -1,5 +1,5 @@
module clientcommon
go 1.25.2
go 1.24.4
require github.com/rodolfoag/gow32 v0.0.0-20230512144032-1e896a3c51aa

View File

@ -1,6 +1,6 @@
module directorybackup
go 1.25.2
go 1.24.4
require (
github.com/cornelk/hashmap v1.0.8

View File

@ -86,7 +86,7 @@ func (c *ChunkState) HandleData(b []byte, client *pbscommon.PBSClient) {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(c.current_chunk))
c.newchunk.Add(1)
client.UploadCompressedChunk(c.wrid, shahash, c.current_chunk)
client.UploadDynamicCompressedChunk(c.wrid, shahash, c.current_chunk)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(c.current_chunk))
c.reusechunk.Add(1)
@ -129,7 +129,7 @@ func (c *ChunkState) Eof(client *pbscommon.PBSClient) {
if _, ok := c.knownChunks.GetOrInsert(shahash, true); !ok {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(c.current_chunk))
client.UploadCompressedChunk(c.wrid, shahash, c.current_chunk)
client.UploadDynamicCompressedChunk(c.wrid, shahash, c.current_chunk)
c.newchunk.Add(1)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(c.current_chunk))
@ -147,7 +147,7 @@ func (c *ChunkState) Eof(client *pbscommon.PBSClient) {
if k2 > len(c.assignments) {
k2 = len(c.assignments)
}
client.AssignChunks(c.wrid, c.assignments[k:k2], c.assignments_offset[k:k2])
client.AssignDynamicChunks(c.wrid, c.assignments[k:k2], c.assignments_offset[k:k2])
}
client.CloseDynamicIndex(c.wrid, hex.EncodeToString(c.chunkdigests.Sum(nil)), c.pos, c.chunkcount)

View File

@ -1,4 +1,4 @@
go 1.25.2
go 1.24.4
use ./clientcommon
use ./pbscommon

View File

@ -35,14 +35,13 @@ type Config struct {
Datastore string `json:"datastore"`
Namespace string `json:"namespace"`
BackupID string `json:"backup-id"`
BackupSourceDir string `json:"backupdir"`
BackupStreamName string `json:"backupstreamname"`
BackupDevice string `json:"backupdev"`
PxarOut string `json:"pxarout"`
SMTP *SMTPConfig `json:"smtp"`
}
func (c *Config) valid() bool {
baseValid := c.BaseURL != "" && c.AuthID != "" && c.Secret != "" && c.Datastore != "" && (c.BackupSourceDir != "" || c.BackupStreamName != "")
baseValid := c.BaseURL != "" && c.AuthID != "" && c.Secret != "" && c.Datastore != "" && c.BackupDevice != ""
if !baseValid {
return baseValid
}
@ -70,9 +69,8 @@ func loadConfig() *Config {
datastoreFlag := flag.String("datastore", "", "Datastore name")
namespaceFlag := flag.String("namespace", "", "Namespace (optional)")
backupIDFlag := flag.String("backup-id", "", "Backup ID (optional - if not specified, the hostname is used as the default)")
backupSourceDirFlag := flag.String("backupdir", "", "Backup source directory, must not be symlink")
backupStreamNameFlag := flag.String("backupstream", "", "Filename for stream backup")
pxarOutFlag := flag.String("pxarout", "", "Output PXAR archive for debug purposes (optional)")
backupDEVFlag := flag.String("backupdev", "", "Backup device file")
mailHostFlag := flag.String("mail-host", "", "mail notification system: mail server host(optional)")
mailPortFlag := flag.String("mail-port", "", "mail notification system: mail server port(optional)")
@ -124,16 +122,10 @@ func loadConfig() *Config {
if *backupIDFlag != "" {
config.BackupID = *backupIDFlag
}
if *backupSourceDirFlag != "" {
config.BackupSourceDir = *backupSourceDirFlag
if *backupDEVFlag != "" {
config.BackupDevice = *backupDEVFlag
}
if *backupStreamNameFlag != "" {
config.BackupStreamName = *backupStreamNameFlag
}
if *pxarOutFlag != "" {
config.PxarOut = *pxarOutFlag
}
initSmtpConfigIfNeeded := func() {
if config.SMTP == nil {

View File

@ -1,6 +1,6 @@
module machinebackup
go 1.25.2
go 1.24.4
require (
github.com/go-ole/go-ole v1.2.6 // indirect

View File

@ -2,17 +2,24 @@ package main
import (
"crypto/sha256"
"fmt"
"encoding/hex"
"path/filepath"
"hash"
"log"
"clientcommon"
"flag"
"fmt"
"io"
"os"
"pbscommon"
"snapshot"
"strings"
"runtime"
"sync/atomic"
"github.com/cornelk/hashmap"
"github.com/shirou/gopsutil/disk"
"github.com/gen2brain/beeep"
"github.com/getlantern/systray"
"github.com/tawesoft/golib/v2/dialog"
)
var defaultMailSubjectTemplate = "Backup {{.Status}}"
@ -51,10 +58,13 @@ func (c *ChunkState) Init(newchunk *atomic.Uint64, reusechunk *atomic.Uint64, kn
}
func main() {
/* var newchunk *atomic.Uint64 = new(atomic.Uint64)
var reusechunk *atomic.Uint64 = new(atomic.Uint64)*/
var newchunk *atomic.Uint64 = new(atomic.Uint64)
var reusechunk *atomic.Uint64 = new(atomic.Uint64)
knownChunks := hashmap.New[string, bool]()
CS := ChunkState{}
CS.Init(newchunk, reusechunk, knownChunks)
/*cfg := loadConfig()
cfg := loadConfig()
if ok := cfg.valid(); !ok {
if runtime.GOOS == "windows" {
@ -91,9 +101,9 @@ func main() {
})
}
//insecure := cfg.CertFingerprint != ""
insecure := cfg.CertFingerprint != ""
/*client := &pbscommon.PBSClient{
client := &pbscommon.PBSClient{
BaseURL: cfg.BaseURL,
CertFingerPrint: cfg.CertFingerprint, //"ea:7d:06:f9:87:73:a4:72:d0:e8:05:a4:b3:3d:95:d7:0a:26:dd:6d:5c:ca:e6:99:83:e4:11:3b:5f:10:f4:4b",
AuthID: cfg.AuthID,
@ -105,15 +115,98 @@ func main() {
BackupID: cfg.BackupID,
},
}
hostname, err := os.Hostname()
/*hostname, err := os.Hostname()
if err != nil {
fmt.Println("Failed to retrieve hostname:", err)
hostname = "unknown"
}*/
//begin := time.Now()
F, err := os.Open(cfg.BackupDevice)
if err != nil {
panic(err)
}
pos, err := F.Seek(0, io.SeekEnd)
if err != nil {
panic(err)
}
total := pos
_, err = F.Seek(0, io.SeekStart)
if err != nil {
panic(err)
}
client.Connect(false)
wrid , err := client.CreateFixedIndex(pbscommon.FixedIndexCreateReq{
ArchiveName: filepath.Base(cfg.BackupDevice)+".fidx",
Size: total,
})
if err != nil {
panic(err)
}
begin := time.Now()*/
partitions, err := disk.Partitions(false) // false means don't include virtual partitions
//Blocks are 4MB as per proxmox docs
block := make([]byte, 4*1024*1024)
for ; CS.pos < uint64(total); {
nread, err := F.Read(block)
if err != nil {
panic(err)
}
if nread <= 0 {
panic("Short read")
}
h := sha256.New()
_, err = h.Write(block[:nread])
if err != nil {
panic(err)
}
shahash := hex.EncodeToString(h.Sum(nil))
//binary.Write(CS.chunkdigests, binary.LittleEndian, (CS.pos + uint64(nread)))
CS.chunkdigests.Write(h.Sum(nil))
_, exists := knownChunks.GetOrInsert(shahash, true)
if exists {
reusechunk.Add(1)
}else{
err = client.UploadFixedCompressedChunk(wrid, shahash, block[:nread])
if err != nil {
panic(err)
}
}
CS.assignments = append(CS.assignments, shahash)
CS.assignments_offset = append(CS.assignments_offset, CS.pos)
CS.pos += uint64(nread)
CS.chunkcount++
fmt.Printf("Chunk %d\n", CS.chunkcount)
}
//Avoid incurring in request entity too large by chunking assignment PUT requests in blocks of at most 128 chunks
for k := 0; k < len(CS.assignments); k += 128 {
k2 := k + 128
if k2 > len(CS.assignments) {
k2 = len(CS.assignments)
}
err = client.AssignFixedChunks(wrid, CS.assignments[k:k2], CS.assignments_offset[k:k2])
if err != nil {
panic(err)
}
}
err = client.CloseFixedIndex(wrid, hex.EncodeToString(CS.chunkdigests.Sum(nil)), CS.pos, CS.chunkcount)
if err != nil {
panic(err)
}
err = client.UploadManifest()
if err != nil {
panic(err)
}
client.Finish()
/*partitions, err := disk.Partitions(false) // false means don't include virtual partitions
if err != nil {
log.Fatalf("Error fetching partitions: %v", err)
}
@ -150,5 +243,16 @@ func main() {
panic(err)
} else {
fmt.Print(n)
}
}*/
//Windows backup logic will be as follows
//1. Enumerate fixed non-usb disks ( SATA + NVME )
//2. Enumerate partitions with offset and length
//3. Start reading using PhysicalDriveX special file
//4. If we go into a region that contains a mounted partition, if filesystem is NTFS or ReFS , take VSS snapshot and switch to the associated shadow volume file
//4. If the partition is not mounted just keep reading, if the partition is mounted and not NTFS or ReFS for now throw a warning and write zeros
//5. For each disk create a fixed index ( Do it in parallel maybe)
}

View File

@ -1,6 +1,6 @@
module pbscommon
go 1.25.2
go 1.24.4
require (
github.com/klauspost/compress v1.18.0

View File

@ -33,7 +33,7 @@ type IndexPutReq struct {
WriterID uint64 `json:"wid"`
}
type DynamicCloseReq struct {
type IndexCloseReq struct {
ChunkCount uint64 `json:"chunk-count"`
CheckSum string `json:"csum"`
Size uint64 `json:"size"`
@ -54,6 +54,11 @@ type ChunkUploadStats struct {
Size int64 `json:"size"`
}
type FixedIndexCreateReq struct {
ArchiveName string `json:"archive-name"`
Size int64 `json:"size"`
}
type Unprotected struct {
ChunkUploadStats ChunkUploadStats `json:"chunk_upload_stats"`
}
@ -95,9 +100,115 @@ type PBSClient struct {
var blobCompressedMagic = []byte{49, 185, 88, 66, 111, 182, 163, 127}
var blobUncompressedMagic = []byte{66, 171, 56, 7, 190, 131, 112, 161}
func (pbs *PBSClient) CreateFixedIndex(fic FixedIndexCreateReq) (uint64, error) {
jd,err := json.Marshal(fic)
if err != nil {
return 0, err
}
req, err := http.NewRequest("POST", pbs.BaseURL+"/fixed_index", bytes.NewBuffer(jd))
if err != nil {
return 0, err
}
req.Header.Add("Authorization", fmt.Sprintf("PBSAPIToken=%s:%s", pbs.AuthID, pbs.Secret))
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
resp2, err := pbs.Client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return 0, err
}
if resp2.StatusCode != http.StatusOK {
resp1, err := io.ReadAll(resp2.Body)
fmt.Println("Error making request:", string(resp1), string(resp2.Proto))
return 0, err
}
resp1, err := io.ReadAll(resp2.Body)
if err != nil {
return 0, err
}
var R IndexCreateResp
err = json.Unmarshal(resp1, &R)
if err != nil {
fmt.Println("Error parsing JSON:", err)
return 0, err
}
fmt.Println("Writer id: ", R.WriterID)
defer resp2.Body.Close()
f := File{
CryptMode: "none",
Csum: "",
Filename: fic.ArchiveName,
Size: 0,
}
pbs.Manifest.Files = append(pbs.Manifest.Files, f)
pbs.WritersManifest[uint64(R.WriterID)] = len(pbs.Manifest.Files) - 1
return uint64(R.WriterID), nil
}
func (pbs *PBSClient) AssignFixedChunks(writerid uint64, digests []string, offsets []uint64) error {
indexput := &IndexPutReq{
WriterID: writerid,
DigestList: digests,
OffsetList: offsets,
}
jsondata, err := json.Marshal(indexput)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", pbs.BaseURL+"/fixed_index", bytes.NewBuffer(jsondata))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
resp2, err := pbs.Client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return err
}
defer resp2.Body.Close()
return nil
}
func (pbs *PBSClient) CloseFixedIndex(writerid uint64, checksum string, totalsize uint64, chunkcount uint64) error {
finishreq := &IndexCloseReq{
WriterID: writerid,
CheckSum: checksum,
Size: totalsize,
ChunkCount: chunkcount,
}
jsonpayload, err := json.Marshal(finishreq)
if err != nil {
return err
}
req, err := http.NewRequest("POST", pbs.BaseURL+"/fixed_close", bytes.NewBuffer(jsonpayload))
if err != nil {
return err
}
req.Header.Add("Authorization", fmt.Sprintf("PBSAPIToken=%s:%s", pbs.AuthID, pbs.Secret))
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
resp2, err := pbs.Client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return err
}
f := &pbs.Manifest.Files[pbs.WritersManifest[writerid]]
f.Csum = checksum
f.Size = int64(totalsize)
defer resp2.Body.Close()
return nil
}
func (pbs *PBSClient) CreateDynamicIndex(name string) (uint64, error) {
req, err := http.NewRequest("POST", pbs.BaseURL+"/dynamic_index", bytes.NewBuffer([]byte(fmt.Sprintf("{\"archive-name\": \"%s\"}", name))))
if err != nil {
return 0, err
@ -119,6 +230,9 @@ func (pbs *PBSClient) CreateDynamicIndex(name string) (uint64, error) {
}
resp1, err := io.ReadAll(resp2.Body)
if err != nil {
return 0, err
}
var R IndexCreateResp
err = json.Unmarshal(resp1, &R)
if err != nil {
@ -138,58 +252,46 @@ func (pbs *PBSClient) CreateDynamicIndex(name string) (uint64, error) {
return uint64(R.WriterID), nil
}
func (pbs *PBSClient) UploadUncompressedChunk(writerid uint64, digest string, chunkdata []byte) error {
outBuffer := make([]byte, 0)
outBuffer = append(outBuffer, blobUncompressedMagic...)
checksum := crc32.Checksum(chunkdata, crc32.IEEETable)
outBuffer = binary.LittleEndian.AppendUint32(outBuffer, checksum)
outBuffer = append(outBuffer, chunkdata...)
q := &url.Values{}
q.Add("digest", digest)
q.Add("encoded-size", fmt.Sprintf("%d", len(outBuffer)))
q.Add("size", fmt.Sprintf("%d", len(chunkdata)))
q.Add("wid", fmt.Sprintf("%d", writerid))
req, err := http.NewRequest("POST", pbs.BaseURL+"/dynamic_chunk?"+q.Encode(), bytes.NewBuffer(outBuffer))
if err != nil {
return err
}
resp2, err := pbs.Client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return err
}
if resp2.StatusCode != http.StatusOK {
resp1, err := io.ReadAll(resp2.Body)
fmt.Println("Error making request:", string(resp1), string(resp2.Proto))
return err
}
return nil
func (pbs *PBSClient) UploadDynamicUncompressedChunk(writerid uint64, digest string, chunkdata []byte) error {
return pbs.UploadChunk(writerid, digest, chunkdata, true, false)
}
func (pbs *PBSClient) UploadFixedUncompressedChunk(writerid uint64, digest string, chunkdata []byte) error {
return pbs.UploadChunk(writerid, digest, chunkdata, false, false)
}
func (pbs *PBSClient) UploadDynamicCompressedChunk(writerid uint64, digest string, chunkdata []byte) error {
return pbs.UploadChunk(writerid, digest, chunkdata, true, true)
}
func (pbs *PBSClient) UploadFixedCompressedChunk(writerid uint64, digest string, chunkdata []byte) error {
return pbs.UploadChunk(writerid, digest, chunkdata, false, true)
}
func (pbs *PBSClient) UploadCompressedChunk(writerid uint64, digest string, chunkdata []byte) error {
func (pbs *PBSClient) UploadChunk(writerid uint64, digest string, chunkdata []byte, dynamic bool, compressed bool) error {
outBuffer := make([]byte, 0)
outBuffer = append(outBuffer, blobCompressedMagic...)
compressedData := make([]byte, 0)
if compressed {
outBuffer = append(outBuffer, blobCompressedMagic...)
compressedData := make([]byte, 0)
//opt := zstd.WithEncoderLevel(zstd.SpeedFastest)
w, _ := zstd.NewWriter(nil)
compressedData = w.EncodeAll(chunkdata, compressedData)
checksum := crc32.Checksum(compressedData, crc32.IEEETable)
//binary.Write(outBuffer, binary.LittleEndian, checksum)
outBuffer = binary.LittleEndian.AppendUint32(outBuffer, checksum)
//opt := zstd.WithEncoderLevel(zstd.SpeedFastest)
w, _ := zstd.NewWriter(nil)
compressedData = w.EncodeAll(chunkdata, compressedData)
checksum := crc32.Checksum(compressedData, crc32.IEEETable)
//binary.Write(outBuffer, binary.LittleEndian, checksum)
outBuffer = binary.LittleEndian.AppendUint32(outBuffer, checksum)
//fmt.Printf("Appended checksum %08x , len: %d\n", checksum, len(outBuffer))
//fmt.Printf("Appended checksum %08x , len: %d\n", checksum, len(outBuffer))
outBuffer = append(outBuffer, compressedData...)
outBuffer = append(outBuffer, compressedData...)
if len(compressedData) > len(chunkdata) {
pbs.UploadUncompressedChunk(writerid, digest, chunkdata)
return nil
if len(compressedData) > len(chunkdata) {
return pbs.UploadChunk(writerid, digest, chunkdata, dynamic, false)
}
}else{
outBuffer = append(outBuffer, blobUncompressedMagic...)
checksum := crc32.Checksum(chunkdata, crc32.IEEETable)
outBuffer = binary.LittleEndian.AppendUint32(outBuffer, checksum)
outBuffer = append(outBuffer, chunkdata...)
}
//fmt.Printf("Compressed: %d , Orig: %d\n", len(compressedData), len(chunkdata))
q := &url.Values{}
@ -197,9 +299,15 @@ func (pbs *PBSClient) UploadCompressedChunk(writerid uint64, digest string, chun
q.Add("encoded-size", fmt.Sprintf("%d", len(outBuffer)))
q.Add("size", fmt.Sprintf("%d", len(chunkdata)))
q.Add("wid", fmt.Sprintf("%d", writerid))
req, err := http.NewRequest("POST", pbs.BaseURL+"/dynamic_chunk?"+q.Encode(), bytes.NewBuffer(outBuffer))
suburl := "/dynamic_chunk?"
if !dynamic {
suburl = "/fixed_chunk?"
}
req, err := http.NewRequest("POST", pbs.BaseURL+suburl+q.Encode(), bytes.NewBuffer(outBuffer))
if err != nil {
fmt.Println("Error making request:", err)
return err
}
resp2, err := pbs.Client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
@ -215,7 +323,7 @@ func (pbs *PBSClient) UploadCompressedChunk(writerid uint64, digest string, chun
return nil
}
func (pbs *PBSClient) AssignChunks(writerid uint64, digests []string, offsets []uint64) error {
func (pbs *PBSClient) AssignDynamicChunks(writerid uint64, digests []string, offsets []uint64) error {
indexput := &IndexPutReq{
WriterID: writerid,
DigestList: digests,
@ -242,7 +350,7 @@ func (pbs *PBSClient) AssignChunks(writerid uint64, digests []string, offsets []
}
func (pbs *PBSClient) CloseDynamicIndex(writerid uint64, checksum string, totalsize uint64, chunkcount uint64) error {
finishreq := &DynamicCloseReq{
finishreq := &IndexCloseReq{
WriterID: writerid,
CheckSum: checksum,
Size: totalsize,

View File

@ -1,6 +1,6 @@
module snapshot
go 1.25.2
go 1.24.4
require github.com/jeromehadorn/vss v0.1.0