/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ /* Package api contains the REST API for RUFS. /about Endpoint which returns an object with version information. { api_versions : List of available API versions e.g. [ "v1" ] product : Name of the API provider (RUFS) version: : Version of the API provider } */ package api import ( "encoding/json" "net/http" "devt.de/krotik/rufs/config" ) /* EndpointAbout is the about endpoint definition (rooted). Handles about/ */ const EndpointAbout = APIRoot + "/about/" /* AboutEndpointInst creates a new endpoint handler. */ func AboutEndpointInst() RestEndpointHandler { return &aboutEndpoint{} } /* aboutEndpoint is the handler object for about operations. */ type aboutEndpoint struct { *DefaultEndpointHandler } /* HandleGET returns about data for the REST API. */ func (a *aboutEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { data := map[string]interface{}{ "product": "RUFS", "version": config.ProductVersion, } // Write data w.Header().Set("content-type", "application/json; charset=utf-8") ret := json.NewEncoder(w) ret.Encode(data) }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package api import ( "crypto/tls" "fmt" "net/http" "strings" "sync" "devt.de/krotik/rufs" ) /* APIVersion is the version of the REST API */ const APIVersion = "1.0.0" /* APIRoot is the API root directory for the REST API */ const APIRoot = "/fs" /* APISchemes defines the supported schemes by the REST API */ var APISchemes = []string{"https"} /* APIHost is the host definition for the REST API */ var APIHost = "localhost:9040" /* RestEndpointInst models a factory function for REST endpoint handlers. */ type RestEndpointInst func() RestEndpointHandler /* GeneralEndpointMap is a map of urls to general REST endpoints */ var GeneralEndpointMap = map[string]RestEndpointInst{ EndpointAbout: AboutEndpointInst, EndpointSwagger: SwaggerEndpointInst, } /* RestEndpointHandler models a REST endpoint handler. */ type RestEndpointHandler interface { /* HandleGET handles a GET request. */ HandleGET(w http.ResponseWriter, r *http.Request, resources []string) /* HandlePOST handles a POST request. */ HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) /* HandlePUT handles a PUT request. */ HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) /* HandleDELETE handles a DELETE request. */ HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) /* SwaggerDefs is used to describe the endpoint in swagger. */ SwaggerDefs(s map[string]interface{}) } /* trees is a map of all trees which can be used by the REST API */ var trees = make(map[string]*rufs.Tree) var treesLock = sync.Mutex{} /* ResetTrees removes all registered trees. */ var ResetTrees = func() { treesLock.Lock() defer treesLock.Unlock() trees = make(map[string]*rufs.Tree) } /* Trees is a getter function which returns a map of all registered trees. This function can be overwritten by client code to implement access control. */ var Trees = func() (map[string]*rufs.Tree, error) { treesLock.Lock() defer treesLock.Unlock() ret := make(map[string]*rufs.Tree) for k, v := range trees { ret[k] = v } return ret, nil } /* GetTree returns a specific tree. This function can be overwritten by client code to implement access control. */ var GetTree = func(id string) (*rufs.Tree, bool, error) { treesLock.Lock() defer treesLock.Unlock() tree, ok := trees[id] return tree, ok, nil } /* AddTree adds a new tree. This function can be overwritten by client code to implement access control. */ var AddTree = func(id string, tree *rufs.Tree) error { treesLock.Lock() defer treesLock.Unlock() if _, ok := trees[id]; ok { return fmt.Errorf("Tree %v already exists", id) } trees[id] = tree return nil } /* RemoveTree removes a tree. This function can be overwritten by client code to implement access control. */ var RemoveTree = func(id string) error { treesLock.Lock() defer treesLock.Unlock() if _, ok := trees[id]; !ok { return fmt.Errorf("Tree %v does not exist", id) } delete(trees, id) return nil } /* TreeConfigTemplate is the configuration which is used for newly created trees. */ var TreeConfigTemplate map[string]interface{} /* TreeCertTemplate is the certificate which is used for newly created trees. */ var TreeCertTemplate *tls.Certificate /* Map of all registered endpoint handlers. */ var registered = map[string]RestEndpointInst{} /* HandleFunc to use for registering handlers */ var HandleFunc = http.HandleFunc /* RegisterRestEndpoints registers all given REST endpoint handlers. */ func RegisterRestEndpoints(endpointInsts map[string]RestEndpointInst) { for url, endpointInst := range endpointInsts { registered[url] = endpointInst HandleFunc(url, func() func(w http.ResponseWriter, r *http.Request) { var handlerURL = url var handlerInst = endpointInst return func(w http.ResponseWriter, r *http.Request) { // Create a new handler instance handler := handlerInst() // Handle request in appropriate method res := strings.TrimSpace(r.URL.Path[len(handlerURL):]) if len(res) > 0 && res[len(res)-1] == '/' { res = res[:len(res)-1] } var resources []string if res != "" { resources = strings.Split(res, "/") } switch r.Method { case "GET": handler.HandleGET(w, r, resources) case "POST": handler.HandlePOST(w, r, resources) case "PUT": handler.HandlePUT(w, r, resources) case "DELETE": handler.HandleDELETE(w, r, resources) default: http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) } } }()) } } /* DefaultEndpointHandler is the default endpoint handler implementation. */ type DefaultEndpointHandler struct { } /* HandleGET handles a GET request. */ func (de *DefaultEndpointHandler) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) } /* HandlePOST handles a POST request. */ func (de *DefaultEndpointHandler) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) } /* HandlePUT handles a PUT request. */ func (de *DefaultEndpointHandler) HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) } /* HandleDELETE handles a DELETE request. */ func (de *DefaultEndpointHandler) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package api import ( "encoding/json" "net/http" ) /* SwaggerDefs is used to describe the endpoint in swagger. */ func (a *aboutEndpoint) SwaggerDefs(s map[string]interface{}) { // Add query paths s["paths"].(map[string]interface{})["/about"] = map[string]interface{}{ "get": map[string]interface{}{ "summary": "Return information about the REST API provider.", "description": "Returns available API versions, product name and product version.", "produces": []string{ "application/json", }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "About info object", "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "api_versions": map[string]interface{}{ "description": "List of available API versions.", "type": "array", "items": map[string]interface{}{ "description": "Available API version.", "type": "string", }, }, "product": map[string]interface{}{ "description": "Product name of the REST API provider.", "type": "string", }, "version": map[string]interface{}{ "description": "Version of the REST API provider.", "type": "string", }, }, }, }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } // Add generic error object to definition s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{ "description": "A human readable error mesage.", "type": "string", } } /* EndpointSwagger is the swagger endpoint URL (rooted). Handles swagger.json/ */ const EndpointSwagger = APIRoot + "/swagger.json/" /* SwaggerEndpointInst creates a new endpoint handler. */ func SwaggerEndpointInst() RestEndpointHandler { return &swaggerEndpoint{} } /* Handler object for swagger operations. */ type swaggerEndpoint struct { *DefaultEndpointHandler } /* HandleGET returns the swagger definition of the REST API. */ func (a *swaggerEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { // Add general sections data := map[string]interface{}{ "swagger": "2.0", "host": APIHost, "schemes": APISchemes, "basePath": APIRoot, "produces": []string{"application/json"}, "paths": map[string]interface{}{}, "definitions": map[string]interface{}{}, } // Go through all registered components and let them add their definitions a.SwaggerDefs(data) for _, inst := range registered { inst().SwaggerDefs(data) } // Write data w.Header().Set("content-type", "application/json; charset=utf-8") ret := json.NewEncoder(w) ret.Encode(data) } /* SwaggerDefs is used to describe the endpoint in swagger. */ func (a *swaggerEndpoint) SwaggerDefs(s map[string]interface{}) { // Add general application information s["info"] = map[string]interface{}{ "title": "RUFS API", "description": "Query and control the Remote Union File System.", "version": APIVersion, } }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ /* Package v1 contains Rufs REST API Version 1. Admin control endpoint /admin The admin endpoint can be used for various admin tasks such as registering new branches or mounting known branches. A GET request to the admin endpoint returns the current tree configuration; an object of all known branches and the current mapping: { branches : [ <known branches> ], tree : [ <current mapping> ] } A POST request to the admin endpoint creates a new tree. The body of the request should have the following form: "<name>" /admin/<tree> A DELETE request to a particular tree will delete the tree. /admin/<tree>/branch A new branch can be created in an existing tree by sending a POST request to the branch endpoint. The body of the request should have the following form: { branch : <Name of the branch>, rpc : <RPC definition of the remote branch (e.g. localhost:9020)>, fingerprint : <Expected SSL fingerprint of the remote branch or an empty string> } /admin/<tree>/mapping A new mapping can be created in an existing tree by sending a POST request to the mapping endpoint. The body of the request should have the following form: { branch : <Name of the branch>, dir : <Tree directory of the branch root>, writable : <Flag if the branch should handle write operations> } Dir listing endpoing /dir/<tree>/<path> The dir endpoing handles requests for the directory listing of a certain path. A request url should be of the following form: /dir/<tree>/<path>?recursive=<flag>&checksums=<flag> The request can optionally include the flag parameters (value should be 1 or 0) recursive and checksums. The recursive flag will add all subdirectories to the listing and the checksums flag will add checksums for all listed files. File queries and manipulation /file/{tree}/{path} A GET request to a specific file will return its contents. A POST will upload a new or overwrite an existing file. A DELETE request will delete an existing file. New files are expected to be uploaded using a multipart/form-data request. When uploading a new file the form field for the file should be named "uploadfile". The form can optionally contain a redirect field which will issue a redirect once the file has been uploaded. A PUT request is used to perform a file operation. The request body should be a JSON object of the form (parameters are operation specific): { action : <Action to perform>, files : <List of (full path) files which should be copied / renamed> newname : <New name of file (when renaming)>, newnames : <List of new file names when renaming multiple files using the files parameter>, destination : <Destination file when copying a single file - Destination directory when copying multiple files using the files parameter or syncing directories> } The action can either be: sync, rename, mkdir or copy. Copy and sync returns a JSON structure containing a progress id: { progress_id : <Id for progress of the copy operation> } Progress information /progress/<progress id> A GET request to the progress endpoint returns the current progress of an ongoing operation. The result should be: { "item": <Currently processing item>, "operation": <Name of operation>, "progress": <Current progress>, "subject": <Name of the subject on which the operation is performed>, "total_items": <Total number of items>, "total_progress": <Total progress> } Create zip files /zip/<tree> A post to the zip enpoint returns a zip file containing requested files. The files to include must be given as a list of file name with full path in the body. The body should be application/x-www-form-urlencoded encoded. The list should be a JSON encoded string as value of the value files. The body should have the following form: files=[ "<file1>", "<file2>" ] */ package v1 import ( "encoding/json" "fmt" "net/http" "strconv" "devt.de/krotik/rufs" "devt.de/krotik/rufs/api" ) /* EndpointAdmin is the mount endpoint URL (rooted). Handles everything under admin/... */ const EndpointAdmin = api.APIRoot + APIv1 + "/admin/" /* AdminEndpointInst creates a new endpoint handler. */ func AdminEndpointInst() api.RestEndpointHandler { return &adminEndpoint{} } /* Handler object for admin operations. */ type adminEndpoint struct { *api.DefaultEndpointHandler } /* HandleGET handles an admin query REST call. */ func (a *adminEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { data := make(map[string]interface{}) trees, err := api.Trees() if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } refreshName := r.URL.Query().Get("refresh") for k, v := range trees { var tree map[string]interface{} if refreshName != "" && k == refreshName { v.Refresh() } json.Unmarshal([]byte(v.Config()), &tree) data[k] = tree } // Write data w.Header().Set("content-type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(data) } /* HandlePOST handles REST calls to create a new tree. */ func (a *adminEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) { var tree *rufs.Tree var ok bool var err error var data map[string]interface{} if len(resources) == 0 { var name string if err := json.NewDecoder(r.Body).Decode(&name); err != nil { http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()), http.StatusBadRequest) return } else if name == "" { http.Error(w, fmt.Sprintf("Body must contain the tree name as a non-empty JSON string"), http.StatusBadRequest) return } // Create a new tree tree, err := rufs.NewTree(api.TreeConfigTemplate, api.TreeCertTemplate) if err != nil { http.Error(w, fmt.Sprintf("Could not create new tree: %v", err.Error()), http.StatusBadRequest) return } // Store the new tree if err := api.AddTree(name, tree); err != nil { http.Error(w, fmt.Sprintf("Could not add new tree: %v", err.Error()), http.StatusBadRequest) } return } if !checkResources(w, resources, 2, 2, "Need a tree name and a section (either branches or mapping)") { return } if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok { err = fmt.Errorf("Unknown tree: %v", resources[0]) } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := json.NewDecoder(r.Body).Decode(&data); err != nil { http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()), http.StatusBadRequest) return } if resources[1] == "branch" { // Add a new branch if rpc, ok := getMapValue(w, data, "rpc"); ok { if branch, ok := getMapValue(w, data, "branch"); ok { if fingerprint, ok := getMapValue(w, data, "fingerprint"); ok { if err := tree.AddBranch(branch, rpc, fingerprint); err != nil { http.Error(w, fmt.Sprintf("Could not add branch: %v", err.Error()), http.StatusBadRequest) } } } } } else if resources[1] == "mapping" { // Add a new mapping if _, ok := data["dir"]; ok { if dir, ok := getMapValue(w, data, "dir"); ok { if branch, ok := getMapValue(w, data, "branch"); ok { if writeableStr, ok := getMapValue(w, data, "writeable"); ok { writeable, err := strconv.ParseBool(writeableStr) if err != nil { http.Error(w, fmt.Sprintf("Writeable value must be a boolean: %v", err.Error()), http.StatusBadRequest) } else if err := tree.AddMapping(dir, branch, writeable); err != nil { http.Error(w, fmt.Sprintf("Could not add branch: %v", err.Error()), http.StatusBadRequest) } } } } } } } /* HandleDELETE handles REST calls to delete an existing tree. */ func (a *adminEndpoint) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) { if !checkResources(w, resources, 1, 1, "Need a tree name") { return } // Delete the tree if err := api.RemoveTree(resources[0]); err != nil { http.Error(w, fmt.Sprintf("Could not remove tree: %v", err.Error()), http.StatusBadRequest) } } /* SwaggerDefs is used to describe the endpoint in swagger. */ func (a *adminEndpoint) SwaggerDefs(s map[string]interface{}) { s["paths"].(map[string]interface{})["/v1/admin"] = map[string]interface{}{ "get": map[string]interface{}{ "summary": "Return all current tree configurations.", "description": "All current tree configurations; each object has a list of all known branches and the current mapping.", "produces": []string{ "text/plain", "application/json", }, "parameters": []map[string]interface{}{ { "name": "refresh", "in": "query", "description": "Refresh a particular tree (reload branches and mappings).", "required": false, "type": "string", }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "A key-value map of tree name to tree configuration", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, "post": map[string]interface{}{ "summary": "Create a new tree.", "description": "Create a new named tree.", "consumes": []string{ "application/json", }, "produces": []string{ "text/plain", }, "parameters": []map[string]interface{}{ { "name": "data", "in": "body", "description": "Name of the new tree.", "required": true, "schema": map[string]interface{}{ "type": "string", }, }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns an empty body if successful.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } s["paths"].(map[string]interface{})["/v1/admin/{tree}"] = map[string]interface{}{ "delete": map[string]interface{}{ "summary": "Delete a tree.", "description": "Delete a named tree.", "produces": []string{ "text/plain", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns an empty body if successful.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } s["paths"].(map[string]interface{})["/v1/admin/{tree}/branch"] = map[string]interface{}{ "post": map[string]interface{}{ "summary": "Add a new branch.", "description": "Add a new remote branch to the tree.", "consumes": []string{ "application/json", }, "produces": []string{ "text/plain", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "data", "in": "body", "description": "Definition of the new branch.", "required": true, "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "branch": map[string]interface{}{ "description": "Name of the remote branch (must match on the remote branch).", "type": "string", }, "rpc": map[string]interface{}{ "description": "RPC definition of the remote branch (e.g. localhost:9020).", "type": "string", }, "fingerprint": map[string]interface{}{ "description": "Expected SSL fingerprint of the remote branch (shown during startup) or an empty string.", "type": "string", }, }, }, }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns an empty body if successful.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } s["paths"].(map[string]interface{})["/v1/admin/{tree}/mapping"] = map[string]interface{}{ "post": map[string]interface{}{ "summary": "Add a new mapping.", "description": "Add a new mapping to the tree.", "consumes": []string{ "application/json", }, "produces": []string{ "text/plain", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "data", "in": "body", "description": "Definition of the new branch.", "required": true, "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "branch": map[string]interface{}{ "description": "Name of the known remote branch.", "type": "string", }, "dir": map[string]interface{}{ "description": "Tree directory which should hold the branch root.", "type": "string", }, "writable": map[string]interface{}{ "description": "Flag if the branch should be mapped as writable.", "type": "string", }, }, }, }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns an empty body if successful.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } // Add generic error object to definition s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{ "description": "A human readable error mesage.", "type": "string", } }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package v1 import ( "encoding/json" "fmt" "net/http" "os" "path" "strconv" "devt.de/krotik/common/stringutil" "devt.de/krotik/rufs" "devt.de/krotik/rufs/api" ) /* EndpointDir is the dir endpoint URL (rooted). Handles everything under dir/... */ const EndpointDir = api.APIRoot + APIv1 + "/dir/" /* DirEndpointInst creates a new endpoint handler. */ func DirEndpointInst() api.RestEndpointHandler { return &dirEndpoint{} } /* Handler object for dir operations. */ type dirEndpoint struct { *api.DefaultEndpointHandler } /* HandleGET handles a dir query REST call. */ func (d *dirEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { var tree *rufs.Tree var ok, checksums bool var err error var dirs []string var fis [][]os.FileInfo if len(resources) == 0 { http.Error(w, "Need at least a tree name", http.StatusBadRequest) return } if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok { err = fmt.Errorf("Unknown tree: %v", resources[0]) } if err == nil { var rex string glob := r.URL.Query().Get("glob") recursive, _ := strconv.ParseBool(r.URL.Query().Get("recursive")) checksums, _ = strconv.ParseBool(r.URL.Query().Get("checksums")) if rex, err = stringutil.GlobToRegex(glob); err == nil { dirs, fis, err = tree.Dir(path.Join(resources[1:]...), rex, recursive, checksums) } } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } data := make(map[string]interface{}) for i, d := range dirs { var flist []map[string]interface{} fi := fis[i] for _, f := range fi { toAdd := map[string]interface{}{ "name": f.Name(), "size": f.Size(), "isdir": f.IsDir(), } if checksums { toAdd["checksum"] = f.(*rufs.FileInfo).Checksum() } flist = append(flist, toAdd) } data[d] = flist } // Write data w.Header().Set("content-type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(data) } /* SwaggerDefs is used to describe the endpoint in swagger. */ func (d *dirEndpoint) SwaggerDefs(s map[string]interface{}) { s["paths"].(map[string]interface{})["/v1/dir/{tree}/{path}"] = map[string]interface{}{ "get": map[string]interface{}{ "summary": "Read a directory.", "description": "List the contents of a directory.", "produces": []string{ "text/plain", "application/json", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "path", "in": "path", "description": "Directory path.", "required": true, "type": "string", }, { "name": "recursive", "in": "query", "description": "Add listings of subdirectories.", "required": false, "type": "boolean", }, { "name": "checksums", "in": "query", "description": "Include file checksums.", "required": false, "type": "boolean", }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns a map of directories with a list of files as values.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } // Add generic error object to definition s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{ "description": "A human readable error mesage.", "type": "string", } }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package v1 import ( "encoding/json" "fmt" "mime/multipart" "net/http" "path" "time" "devt.de/krotik/common/cryptutil" "devt.de/krotik/common/datautil" "devt.de/krotik/common/errorutil" "devt.de/krotik/common/httputil" "devt.de/krotik/rufs" "devt.de/krotik/rufs/api" ) // Progress endpoint // ================= /* Progress is a persisted data structure which contains the current progress of an ongoing operation. */ type Progress struct { Op string // Operation which we show progress of Subject string // Subject on which the operation is performed Progress int64 // Current progress of the ongoing operation (this is reset for each item) TotalProgress int64 // Total progress required until current operation is finished Item int64 // Current processing item TotalItems int64 // Total number of items to process Errors []string // Any error messages } /* JSONString returns the progress object as a JSON string. */ func (p *Progress) JSONString() []byte { ret, err := json.MarshalIndent(map[string]interface{}{ "operation": p.Op, "subject": p.Subject, "progress": p.Progress, "total_progress": p.TotalProgress, "item": p.Item, "total_items": p.TotalItems, "errors": p.Errors, }, "", " ") errorutil.AssertOk(err) return ret } /* ProgressMap contains information about copy progress. */ var ProgressMap = datautil.NewMapCache(100, 0) /* EndpointProgress is the progress endpoint URL (rooted). Handles everything under progress/... */ const EndpointProgress = api.APIRoot + APIv1 + "/progress/" /* ProgressEndpointInst creates a new endpoint handler. */ func ProgressEndpointInst() api.RestEndpointHandler { return &progressEndpoint{} } /* Handler object for progress operations. */ type progressEndpoint struct { *api.DefaultEndpointHandler } /* HandleGET handles a progress query REST call. */ func (f *progressEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { var ok bool var err error if len(resources) < 2 { http.Error(w, "Need a tree name and a progress ID", http.StatusBadRequest) return } if _, ok, err = api.GetTree(resources[0]); err == nil && !ok { err = fmt.Errorf("Unknown tree: %v", resources[0]) } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } p, ok := ProgressMap.Get(resources[0] + "#" + resources[1]) if !ok { http.Error(w, fmt.Sprintf("Unknown progress ID: %v", resources[1]), http.StatusBadRequest) return } w.Header().Set("content-type", "application/octet-stream") w.Write(p.(*Progress).JSONString()) } /* SwaggerDefs is used to describe the endpoint in swagger. */ func (f *progressEndpoint) SwaggerDefs(s map[string]interface{}) { s["paths"].(map[string]interface{})["/v1/progress/{tree}/{progress_id}"] = map[string]interface{}{ "get": map[string]interface{}{ "summary": "Request progress update.", "description": "Return a progress object showing the progress of an ongoing operation.", "produces": []string{ "text/plain", "application/json", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "progress_id", "in": "path", "description": "Id of progress object.", "required": true, "type": "string", }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns the requested progress object.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } // Add generic error object to definition s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{ "description": "A human readable error mesage.", "type": "string", } } // File endpoint // ============= /* EndpointFile is the file endpoint URL (rooted). Handles everything under file/... */ const EndpointFile = api.APIRoot + APIv1 + "/file/" /* FileEndpointInst creates a new endpoint handler. */ func FileEndpointInst() api.RestEndpointHandler { return &fileEndpoint{} } /* Handler object for file operations. */ type fileEndpoint struct { *api.DefaultEndpointHandler } /* HandleGET handles a file query REST call. */ func (f *fileEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) { var tree *rufs.Tree var ok bool var err error if len(resources) < 2 { http.Error(w, "Need a tree name and a file path", http.StatusBadRequest) return } if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok { err = fmt.Errorf("Unknown tree: %v", resources[0]) } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("content-type", "application/octet-stream") if err := tree.ReadFileToBuffer(path.Join(resources[1:]...), w); err != nil { http.Error(w, fmt.Sprintf("Could not read file %v: %v", path.Join(resources[1:]...), err.Error()), http.StatusBadRequest) return } } /* HandlePUT handles REST calls to modify / copy existing files. */ func (f *fileEndpoint) HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) { f.handleFileOp("PUT", w, r, resources) } /* HandleDELETE handles REST calls to delete existing files. */ func (f *fileEndpoint) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) { f.handleFileOp("DELETE", w, r, resources) } func (f *fileEndpoint) handleFileOp(requestType string, w http.ResponseWriter, r *http.Request, resources []string) { var action string var data, ret map[string]interface{} var tree *rufs.Tree var ok bool var err error var files []string if len(resources) < 1 { http.Error(w, "Need a tree name and a file path", http.StatusBadRequest) return } else if len(resources) == 1 { resources = append(resources, "/") } if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok { err = fmt.Errorf("Unknown tree: %v", resources[0]) } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } ret = make(map[string]interface{}) if requestType == "DELETE" { // See if the request contains a body with a list of files err = json.NewDecoder(r.Body).Decode(&files) } else { // Unless it is a delete request we need an action command err = json.NewDecoder(r.Body).Decode(&data) if err != nil { http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()), http.StatusBadRequest) return } actionObj, ok := data["action"] if !ok { http.Error(w, fmt.Sprintf("Action command is missing from request body"), http.StatusBadRequest) return } action = fmt.Sprint(actionObj) } fullPath := path.Join(resources[1:]...) if fullPath != "/" { fullPath = "/" + fullPath } dir, file := path.Split(fullPath) if requestType == "DELETE" { if len(files) == 0 { _, err = tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActDelete, rufs.ItemOpName: file, }) } else { // Delete the files given in the body for _, f := range files { dir, file := path.Split(f) if err == nil { _, err = tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActDelete, rufs.ItemOpName: file, }) } } } } else if action == "rename" { if newNamesParam, ok := data["newnames"]; ok { if newNames, ok := newNamesParam.([]interface{}); !ok { err = fmt.Errorf("Parameter newnames must be a list of filenames") } else { if filesParam, ok := data["files"]; !ok { err = fmt.Errorf("Parameter files is missing from request body") } else { if filesList, ok := filesParam.([]interface{}); !ok { err = fmt.Errorf("Parameter files must be a list of files") } else { for i, f := range filesList { dir, file := path.Split(fmt.Sprint(f)) if err == nil { _, err = tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActRename, rufs.ItemOpName: file, rufs.ItemOpNewName: fmt.Sprint(newNames[i]), }) } } } } } } else { newName, ok := data["newname"] if !ok { err = fmt.Errorf("Parameter newname is missing from request body") } else { _, err = tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActRename, rufs.ItemOpName: file, rufs.ItemOpNewName: fmt.Sprint(newName), }) } } } else if action == "mkdir" { _, err = tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActMkDir, rufs.ItemOpName: file, }) } else if action == "copy" { dest, ok := data["destination"] if !ok { err = fmt.Errorf("Parameter destination is missing from request body") } else { // Create file list filesParam, hasFilesParam := data["files"] if hasFilesParam { if lf, ok := filesParam.([]interface{}); !ok { err = fmt.Errorf("Parameter files must be a list of files") } else { files = make([]string, len(lf)) for i, f := range lf { files[i] = fmt.Sprint(f) } } } else { files = []string{fullPath} } if err == nil { // Create progress object uuid := fmt.Sprintf("%x", cryptutil.GenerateUUID()) ret["progress_id"] = uuid mapLookup := resources[0] + "#" + uuid ProgressMap.Put(mapLookup, &Progress{ Op: "Copy", Subject: "", Progress: 0, TotalProgress: 0, Item: 0, TotalItems: int64(len(files)), Errors: []string{}, }) go func() { err = tree.Copy(files, fmt.Sprint(dest), func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64) { if p, ok := ProgressMap.Get(mapLookup); ok && writtenBytes > 0 { p.(*Progress).Subject = file p.(*Progress).Progress = writtenBytes p.(*Progress).TotalProgress = totalBytes p.(*Progress).Item = currentFile p.(*Progress).TotalItems = totalFiles } }) if err != nil { if p, ok := ProgressMap.Get(mapLookup); ok { p.(*Progress).Errors = append(p.(*Progress).Errors, err.Error()) } } }() // Wait a little bit so immediate errors are directly reported time.Sleep(10 * time.Millisecond) } } } else if action == "sync" { dest, ok := data["destination"] if !ok { err = fmt.Errorf("Parameter destination is missing from request body") } else { uuid := fmt.Sprintf("%x", cryptutil.GenerateUUID()) ret["progress_id"] = uuid mapLookup := resources[0] + "#" + uuid ProgressMap.Put(mapLookup, &Progress{ Op: "Sync", Subject: "", Progress: 0, TotalProgress: -1, Item: 0, TotalItems: -1, Errors: []string{}, }) go func() { err = tree.Sync(fullPath, fmt.Sprint(dest), true, func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64) { if p, ok := ProgressMap.Get(mapLookup); ok && writtenBytes > 0 { p.(*Progress).Op = op p.(*Progress).Subject = srcFile p.(*Progress).Progress = writtenBytes p.(*Progress).TotalProgress = totalBytes p.(*Progress).Item = currentFile p.(*Progress).TotalItems = totalFiles } }) if err != nil { if p, ok := ProgressMap.Get(mapLookup); ok { p.(*Progress).Errors = append(p.(*Progress).Errors, err.Error()) } } }() // Wait a little bit so immediate errors are directly reported time.Sleep(10 * time.Millisecond) } } else { err = fmt.Errorf("Unknown action: %v", action) } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // Write data w.Header().Set("content-type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(ret) } /* HandlePOST handles REST calls to create or overwrite a new file. */ func (f *fileEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) { var err error var tree *rufs.Tree var ok bool if len(resources) < 1 { http.Error(w, "Need a tree name and a file path", http.StatusBadRequest) return } if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok { err = fmt.Errorf("Unknown tree: %v", resources[0]) } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // Check we have the right request type if r.MultipartForm == nil { if err = r.ParseMultipartForm(32 << 20); err != nil { http.Error(w, fmt.Sprintf("Could not read request body: %v", err.Error()), http.StatusBadRequest) return } } if r.MultipartForm != nil && r.MultipartForm.File != nil { // Check the files are in the form field uploadfile files, ok := r.MultipartForm.File["uploadfile"] if !ok { http.Error(w, "Could not find 'uploadfile' form field", http.StatusBadRequest) return } for _, file := range files { var f multipart.File // Write out all send files if f, err = file.Open(); err == nil { err = tree.WriteFileFromBuffer(path.Join(path.Join(resources[1:]...), file.Filename), f) } if err != nil { http.Error(w, fmt.Sprintf("Could not write file %v: %v", path.Join(resources[1:]...)+file.Filename, err.Error()), http.StatusBadRequest) return } } } if redirect := r.PostFormValue("redirect"); redirect != "" { // Do the redirect - make sure it is a local redirect if err = httputil.CheckLocalRedirect(redirect); err != nil { http.Error(w, fmt.Sprintf("Could not redirect: %v", err.Error()), http.StatusBadRequest) return } http.Redirect(w, r, redirect, http.StatusFound) } } /* SwaggerDefs is used to describe the endpoint in swagger. */ func (f *fileEndpoint) SwaggerDefs(s map[string]interface{}) { s["paths"].(map[string]interface{})["/v1/file/{tree}/{path}"] = map[string]interface{}{ "get": map[string]interface{}{ "summary": "Read a file.", "description": "Return the contents of a file.", "produces": []string{ "text/plain", "application/octet-stream", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "path", "in": "path", "description": "File path.", "required": true, "type": "string", }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns the content of the requested file.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, "put": map[string]interface{}{ "summary": "Perform a file operation.", "description": "Perform a file operation like rename or copy.", "consumes": []string{ "application/json", }, "produces": []string{ "text/plain", "application/json", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "path", "in": "path", "description": "File path.", "required": true, "type": "string", }, { "name": "operation", "in": "body", "description": "Operation which should be executes", "required": true, "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "action": map[string]interface{}{ "description": "Action to perform.", "type": "string", "enum": []string{ "rename", "mkdir", "copy", "sync", }, }, "newname": map[string]interface{}{ "description": "New filename when renaming a single file.", "type": "string", }, "newnames": map[string]interface{}{ "description": "List of new file names when renaming multiple files using the files parameter.", "type": "array", "items": map[string]interface{}{ "description": "New filename.", "type": "string", }, }, "destination": map[string]interface{}{ "description": "Destination directory when copying files.", "type": "string", }, "files": map[string]interface{}{ "description": "List of (full path) files which should be copied / renamed.", "type": "array", "items": map[string]interface{}{ "description": "File (with full path) which should be copied / renamed.", "type": "string", }, }, }, }, }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns the content of the requested file.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, "post": map[string]interface{}{ "summary": "Upload a file.", "description": "Upload or overwrite a file.", "produces": []string{ "text/plain", }, "consumes": []string{ "multipart/form-data", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "path", "in": "path", "description": "File path.", "required": true, "type": "string", }, { "name": "redirect", "in": "formData", "description": "Page to redirect to after processing the request.", "required": false, "type": "string", }, { "name": "uploadfile", "in": "formData", "description": "File(s) to create / overwrite.", "required": true, "type": "file", }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Successful upload no redirect parameter given.", }, "302": map[string]interface{}{ "description": "Successful upload - redirect according to the given redirect parameter.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, "delete": map[string]interface{}{ "summary": "Delete a file or directory.", "description": "Delete a file or directory.", "produces": []string{ "text/plain", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "path", "in": "path", "description": "File or directory path.", "required": true, "type": "string", }, { "name": "filelist", "in": "body", "description": "List of (full path) files which should be deleted", "required": false, "schema": map[string]interface{}{ "type": "array", "items": map[string]interface{}{ "description": "File (with full path) which should be deleted.", "type": "string", }, }, }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns the content of the requested file.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } // Add generic error object to definition s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{ "description": "A human readable error mesage.", "type": "string", } }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package v1 import ( "fmt" "net/http" "strings" "devt.de/krotik/rufs/api" ) /* APIv1 is the directory for version 1 of the API */ const APIv1 = "/v1" /* V1EndpointMap is a map of urls to endpoints for version 1 of the API */ var V1EndpointMap = map[string]api.RestEndpointInst{ EndpointAdmin: AdminEndpointInst, EndpointDir: DirEndpointInst, EndpointFile: FileEndpointInst, EndpointProgress: ProgressEndpointInst, EndpointZip: ZipEndpointInst, } // Helper functions // ================ /* checkResources check given resources for a GET request. */ func checkResources(w http.ResponseWriter, resources []string, requiredMin int, requiredMax int, errorMsg string) bool { if len(resources) < requiredMin { http.Error(w, errorMsg, http.StatusBadRequest) return false } else if len(resources) > requiredMax { http.Error(w, "Invalid resource specification: "+strings.Join(resources[1:], "/"), http.StatusBadRequest) return false } return true } /* getMapValue extracts a value from a given map. */ func getMapValue(w http.ResponseWriter, data map[string]interface{}, key string) (string, bool) { if val, ok := data[key]; ok && val != "" { return fmt.Sprint(val), true } http.Error(w, fmt.Sprintf("Value for %v is missing in posted data", key), http.StatusBadRequest) return "", false }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package v1 import ( "archive/zip" "bytes" "encoding/json" "fmt" "net/http" "devt.de/krotik/rufs" "devt.de/krotik/rufs/api" ) /* EndpointZip is the zip endpoint URL (rooted). Handles everything under zip/... */ const EndpointZip = api.APIRoot + APIv1 + "/zip/" /* ZipEndpointInst creates a new endpoint handler. */ func ZipEndpointInst() api.RestEndpointHandler { return &zipEndpoint{} } /* Handler object for zip operations. */ type zipEndpoint struct { *api.DefaultEndpointHandler } /* HandlePOST handles a zip query REST call. */ func (z *zipEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) { var tree *rufs.Tree var data []string var ok bool var err error if !checkResources(w, resources, 1, 1, "Need a tree name") { return } if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok { http.Error(w, fmt.Sprintf("Unknown tree: %v", resources[0]), http.StatusBadRequest) return } if err = r.ParseForm(); err == nil { files := r.Form["files"] if len(files) == 0 { err = fmt.Errorf("Field 'files' should be a list of files as JSON encoded string") } else { err = json.NewDecoder(bytes.NewBufferString(files[0])).Decode(&data) } } if err != nil { http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()), http.StatusBadRequest) return } w.Header().Set("content-type", "application/octet-stream") w.Header().Set("content-disposition", `attachment; filename="files.zip"`) // Go through the list of files and stream the zip file zipW := zip.NewWriter(w) for _, f := range data { writer, _ := zipW.Create(f) tree.ReadFileToBuffer(f, writer) } zipW.Close() } /* SwaggerDefs is used to describe the endpoint in swagger. */ func (z *zipEndpoint) SwaggerDefs(s map[string]interface{}) { s["paths"].(map[string]interface{})["/v1/zip/{tree}"] = map[string]interface{}{ "post": map[string]interface{}{ "summary": "Create zip file from a list of files.", "description": "Combine a list of given files into a single zip file.", "produces": []string{ "text/plain", }, "consumes": []string{ "application/x-www-form-urlencoded", }, "parameters": []map[string]interface{}{ { "name": "tree", "in": "path", "description": "Name of the tree.", "required": true, "type": "string", }, { "name": "files", "in": "body", "description": "JSON encoded list of (full path) files which should be zipped up", "required": true, "schema": map[string]interface{}{ "type": "array", "items": map[string]interface{}{ "description": "File (with full path) which should be included in the zip file.", "type": "string", }, }, }, }, "responses": map[string]interface{}{ "200": map[string]interface{}{ "description": "Returns the content of the requested file.", }, "default": map[string]interface{}{ "description": "Error response", "schema": map[string]interface{}{ "$ref": "#/definitions/Error", }, }, }, }, } }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ /* Package rufs contains the main API to Rufs. Rufs is organized as a collection of branches. Each branch represents a physical file system structure which can be queried and updated by an authorized client. On the client side one or several branches are organized into a tree. The single branches can overlay each other. For example: Branch A /foo/A /foo/B /bar/C Branch B /foo/C /test/D Tree 1 /myspace => Branch A, Branch B Accessing tree with: /myspace/foo/A gets file /foo/A from Branch A while /myspace/foo/C gets file /foo/C from Branch B Write operations go only to branches which are mapped as writing branches and who accept them (i.e. are not set to readonly on the side of the branch). */ package rufs import ( "bytes" "crypto/tls" "encoding/gob" "fmt" "io" "io/ioutil" "os" "path" "path/filepath" "regexp" "strconv" "strings" "devt.de/krotik/common/errorutil" "devt.de/krotik/common/fileutil" "devt.de/krotik/common/pools" "devt.de/krotik/common/stringutil" "devt.de/krotik/rufs/config" "devt.de/krotik/rufs/node" ) func init() { // Make sure we can use the relevant types in a gob operation gob.Register([][]os.FileInfo{}) gob.Register(&FileInfo{}) } /* Branch models a single exported branch in Rufs. */ type Branch struct { rootPath string // Local directory (absolute path) modeling the branch root node *node.RufsNode // Local RPC node readonly bool // Flag if this branch is readonly } /* NewBranch returns a new exported branch. */ func NewBranch(cfg map[string]interface{}, cert *tls.Certificate) (*Branch, error) { var err error var b *Branch // Make sure the given config is ok if err = config.CheckBranchExportConfig(cfg); err == nil { // Create RPC server addr := fmt.Sprintf("%v:%v", fileutil.ConfStr(cfg, config.RPCHost), fileutil.ConfStr(cfg, config.RPCPort)) rn := node.NewNode(addr, fileutil.ConfStr(cfg, config.BranchName), fileutil.ConfStr(cfg, config.BranchSecret), cert, nil) // Start the rpc server if err = rn.Start(cert); err == nil { var rootPath string // Construct root path if rootPath, err = filepath.Abs(fileutil.ConfStr(cfg, config.LocalFolder)); err == nil { b = &Branch{rootPath, rn, fileutil.ConfBool(cfg, config.EnableReadOnly)} rn.DataHandler = b.requestHandler } } } return b, err } /* Name returns the name of the branch. */ func (b *Branch) Name() string { return b.node.Name() } /* SSLFingerprint returns the SSL fingerprint of the branch. */ func (b *Branch) SSLFingerprint() string { return b.node.SSLFingerprint() } /* Shutdown shuts the branch down. */ func (b *Branch) Shutdown() error { return b.node.Shutdown() } /* IsReadOnly returns if this branch is read-only. */ func (b *Branch) IsReadOnly() bool { return b.readonly } /* checkReadOnly returns an error if this branch is read-only. */ func (b *Branch) checkReadOnly() error { var err error if b.IsReadOnly() { err = fmt.Errorf("Branch %v is read-only", b.Name()) } return err } // Branch API // ========== /* Dir returns file listings matching a given pattern of one or more directories. The contents of the given path is returned along with checksums if the checksum flag is specified. Optionally, also the contents of all subdirectories can be returned if the recursive flag is set. The return values is a list of traversed directories (platform-agnostic) and their corresponding contents. */ func (b *Branch) Dir(spath string, pattern string, recursive bool, checksums bool) ([]string, [][]os.FileInfo, error) { var fis []os.FileInfo // Compile pattern re, err := regexp.Compile(pattern) if err != nil { return nil, nil, err } createRufsFileInfos := func(dirname string, afis []os.FileInfo) []os.FileInfo { var fis []os.FileInfo fis = make([]os.FileInfo, 0, len(afis)) for _, fi := range afis { // Append if it matches the pattern if re.MatchString(fi.Name()) { fis = append(fis, fi) } } // Wrap normal file infos and calculate checksum if necessary ret := WrapFileInfos(dirname, fis) if checksums { for _, fi := range fis { if !fi.IsDir() { // The sum is either there or not ... - access errors should // be caught when trying to read the file sum, _ := fileutil.CheckSumFileFast(filepath.Join(dirname, fi.Name())) fi.(*FileInfo).FiChecksum = sum } } } return ret } subPath, err := b.constructSubPath(spath) if err == nil { if !recursive { if fis, err = ioutil.ReadDir(subPath); err == nil { return []string{spath}, [][]os.FileInfo{createRufsFileInfos(subPath, fis)}, nil } } else { var rpaths []string var rfis [][]os.FileInfo var addSubDir func(string, string) error // Recursive function to walk directories and symlinks // in a platform-agnostic way addSubDir = func(p string, rp string) error { fis, err = ioutil.ReadDir(p) if err == nil { rpaths = append(rpaths, rp) rfis = append(rfis, createRufsFileInfos(p, fis)) for _, fi := range fis { if err == nil && fi.IsDir() { err = addSubDir(filepath.Join(p, fi.Name()), path.Join(rp, fi.Name())) } } } return err } if err = addSubDir(subPath, spath); err == nil { return rpaths, rfis, nil } } } // Ignore any not exists errors if os.IsNotExist(err) { err = nil } return nil, nil, err } /* ReadFileToBuffer reads a complete file into a given buffer which implements io.Writer. */ func (b *Branch) ReadFileToBuffer(spath string, buf io.Writer) error { var n int var err error var offset int64 readBuf := make([]byte, DefaultReadBufferSize) for err == nil { n, err = b.ReadFile(spath, readBuf, offset) if err == nil { _, err = buf.Write(readBuf[:n]) offset += int64(n) } else if IsEOF(err) { // We reached the end of the file err = nil break } } return err } /* ReadFile reads up to len(p) bytes into p from the given offset. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. */ func (b *Branch) ReadFile(spath string, p []byte, offset int64) (int, error) { var n int subPath, err := b.constructSubPath(spath) if err == nil { var fi os.FileInfo if fi, err = os.Stat(subPath); err == nil { if fi.IsDir() { err = fmt.Errorf("read /%v: is a directory", spath) } else if err == nil { var f *os.File if f, err = os.Open(subPath); err == nil { defer f.Close() sr := io.NewSectionReader(f, 0, fi.Size()) if _, err = sr.Seek(offset, io.SeekStart); err == nil { n, err = sr.Read(p) } } } } } return n, err } /* WriteFileFromBuffer writes a complete file from a given buffer which implements io.Reader. */ func (b *Branch) WriteFileFromBuffer(spath string, buf io.Reader) error { var err error var offset int64 if err = b.checkReadOnly(); err == nil { writeBuf := make([]byte, DefaultReadBufferSize) for err == nil { var n int if n, err = buf.Read(writeBuf); err == nil { _, err = b.WriteFile(spath, writeBuf[:n], offset) offset += int64(n) } else if IsEOF(err) { // We reached the end of the file b.WriteFile(spath, []byte{}, offset) err = nil break } } } return err } /* WriteFile writes p into the given file from the given offset. It returns the number of written bytes and any error encountered. */ func (b *Branch) WriteFile(spath string, p []byte, offset int64) (int, error) { var n int var m int64 if err := b.checkReadOnly(); err != nil { return 0, err } buf := byteSlicePool.Get().([]byte) defer func() { byteSlicePool.Put(buf) }() growFile := func(f *os.File, n int64) { var err error toWrite := n for err == nil && toWrite > 0 { if toWrite > int64(DefaultReadBufferSize) { _, err = f.Write(buf[:DefaultReadBufferSize]) toWrite -= int64(DefaultReadBufferSize) } else { _, err = f.Write(buf[:toWrite]) toWrite = 0 } } } subPath, err := b.constructSubPath(spath) if err == nil { var fi os.FileInfo var f *os.File if fi, err = os.Stat(subPath); os.IsNotExist(err) { // Ensure path exists dir, _ := filepath.Split(subPath) if err = os.MkdirAll(dir, 0755); err == nil { // Create the file newly if f, err = os.OpenFile(subPath, os.O_RDWR|os.O_CREATE, 0644); err == nil { defer f.Close() if offset > 0 { growFile(f, offset) } m, err = io.Copy(f, bytes.NewBuffer(p)) n += int(m) } } } else if err == nil { // File does exist if f, err := os.OpenFile(subPath, os.O_RDWR, 0644); err == nil { defer f.Close() if fi.Size() < offset { f.Seek(fi.Size(), io.SeekStart) growFile(f, offset-fi.Size()) } else { f.Seek(offset, io.SeekStart) } m, err = io.Copy(f, bytes.NewBuffer(p)) errorutil.AssertOk(err) n += int(m) } } } return n, err } /* ItemOp parameter */ const ( ItemOpAction = "itemop_action" // ItemOp action ItemOpName = "itemop_name" // Item name ItemOpNewName = "itemop_newname" // New item name ) /* ItemOp actions */ const ( ItemOpActRename = "rename" // Rename a file or directory ItemOpActDelete = "delete" // Delete a file or directory ItemOpActMkDir = "mkdir" // Create a directory ) /* ItemOp executes a file or directory specific operation which can either succeed or fail (e.g. rename or delete). Actions and parameters should be given in the opdata map. */ func (b *Branch) ItemOp(spath string, opdata map[string]string) (bool, error) { res := false if err := b.checkReadOnly(); err != nil { return false, err } subPath, err := b.constructSubPath(spath) if err == nil { action := opdata[ItemOpAction] fileFromOpData := func(key string) (string, error) { // Make sure we are only dealing with files _, name := filepath.Split(opdata[key]) if name == "" { return "", fmt.Errorf("This operation requires a specific file or directory") } // Build the relative paths return filepath.Join(filepath.FromSlash(subPath), name), nil } if action == ItemOpActMkDir { var name string // Make directory action if name, err = fileFromOpData(ItemOpName); err == nil { err = os.MkdirAll(name, 0755) } } else if action == ItemOpActRename { var name, newname string // Rename action if name, err = fileFromOpData(ItemOpName); err == nil { if newname, err = fileFromOpData(ItemOpNewName); err == nil { err = os.Rename(name, newname) } } } else if action == ItemOpActDelete { var name string // Delete action if name, err = fileFromOpData(ItemOpName); err == nil { del := func(name string) error { var err error if ok, _ := fileutil.PathExists(name); ok { err = os.RemoveAll(name) } else { err = os.ErrNotExist } return err } if strings.Contains(name, "*") { var rex string // We have a wildcard rootdir, glob := filepath.Split(name) // Create a regex from the given glob expression if rex, err = stringutil.GlobToRegex(glob); err == nil { var dirs []string var fis [][]os.FileInfo if dirs, fis, err = b.Dir(spath, rex, true, false); err == nil { for i, dir := range dirs { // Remove all files and dirs according to the wildcard for _, fi := range fis[i] { os.RemoveAll(filepath.Join(rootdir, filepath.FromSlash(dir), fi.Name())) } } } } } else { err = del(name) } } } // Determine if we succeeded res = err == nil || os.IsNotExist(err) } return res, err } // Request handling functions // ========================== /* DefaultReadBufferSize is the default size for file reading. */ var DefaultReadBufferSize = 1024 * 16 /* bufferPool holds buffers which are used to marshal objects. */ var bufferPool = pools.NewByteBufferPool() /* byteSlicePool holds buffers which are used to read files */ var byteSlicePool = pools.NewByteSlicePool(DefaultReadBufferSize) /* Meta parameter */ const ( ParamAction = "a" // Requested action ParamPath = "p" // Path string ParamPattern = "x" // Pattern string ParamRecursive = "r" // Recursive flag ParamChecksums = "c" // Checksum flag ParamOffset = "o" // Offset parameter ParamSize = "s" // Size parameter ) /* Possible actions */ const ( OpDir = "dir" // Read the contents of a path OpRead = "read" // Read the contents of a file OpWrite = "write" // Read the contents of a file OpItemOp = "itemop" // File or directory operation ) /* requestHandler handles incoming requests from other branches or trees. */ func (b *Branch) requestHandler(ctrl map[string]string, data []byte) ([]byte, error) { var err error var res interface{} var ret []byte action := ctrl[ParamAction] // Handle operation requests if action == OpDir { var dirs []string var fis [][]os.FileInfo dir := ctrl[ParamPath] pattern := ctrl[ParamPattern] rec := strings.ToLower(ctrl[ParamRecursive]) == "true" sum := strings.ToLower(ctrl[ParamChecksums]) == "true" if dirs, fis, err = b.Dir(dir, pattern, rec, sum); err == nil { res = []interface{}{dirs, fis} } } else if action == OpItemOp { res, err = b.ItemOp(ctrl[ParamPath], ctrl) } else if action == OpRead { var size, n int var offset int64 spath := ctrl[ParamPath] if size, err = strconv.Atoi(ctrl[ParamSize]); err == nil { if offset, err = strconv.ParseInt(ctrl[ParamOffset], 10, 64); err == nil { buf := byteSlicePool.Get().([]byte) defer func() { byteSlicePool.Put(buf) }() if len(buf) < size { // Constantly requesting bigger buffers will // eventually replace all default sized buffers buf = make([]byte, size) } if n, err = b.ReadFile(spath, buf[:size], offset); err == nil { res = []interface{}{n, buf[:size]} } } } } else if action == OpWrite { var offset int64 spath := ctrl[ParamPath] if offset, err = strconv.ParseInt(ctrl[ParamOffset], 10, 64); err == nil { res, err = b.WriteFile(spath, data, offset) } } // Send the response if err == nil { // Allocate a new encoding buffer - no need to lock as // it is based on sync.Pool // Pooled encoding buffers are used to keep expensive buffer // reallocations to a minimum. It is better to allocate the // actual response buffer once the response size is known. bb := bufferPool.Get().(*bytes.Buffer) if err = gob.NewEncoder(bb).Encode(res); err == nil { toSend := bb.Bytes() // Allocate the response array ret = make([]byte, len(toSend)) // Copy encoded result into the response array copy(ret, toSend) } // Return the encoding buffer back to the pool go func() { bb.Reset() bufferPool.Put(bb) }() } if err != nil { // Ensure we don't leak local paths - this might not work in // all situations and depends on the underlying os. In this // error messages might include information on the full local // path in error messages. absRoot, _ := filepath.Abs(b.rootPath) err = fmt.Errorf("%v", strings.Replace(err.Error(), absRoot, "", -1)) } return ret, err } // Util functions // ============== func (b *Branch) constructSubPath(rpath string) (string, error) { // Produce the actual subpath - this should also produce windows // paths correctly (i.e. foo/bar -> C:\root\foo\bar) subPath := filepath.Join(b.rootPath, filepath.FromSlash(rpath)) // Check that the new sub path is under the root path absSubPath, err := filepath.Abs(subPath) if err == nil { if strings.HasPrefix(absSubPath, b.rootPath) { return subPath, nil } err = fmt.Errorf("Requested path %v is outside of the branch", rpath) } return "", err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package config import "fmt" /* ProductVersion is the current version of Rufs */ const ProductVersion = "1.1.1" /* Defaut configuration keys */ const ( // Branch configuration (export) BranchName = "BranchName" BranchSecret = "BranchSecret" EnableReadOnly = "EnableReadOnly" RPCHost = "RPCHost" RPCPort = "RPCPort" LocalFolder = "LocalFolder" // Tree configuration TreeSecret = "TreeSecret" ) /* DefaultBranchExportConfig is the default configuration for an exported branch */ var DefaultBranchExportConfig = map[string]interface{}{ BranchName: "", // Auto name (based on available network interface) BranchSecret: "", // Secret needs to be provided by the client EnableReadOnly: false, // FS access is readonly for clients RPCHost: "", // Auto (first available external interface) RPCPort: "9020", // Communication port for this branch LocalFolder: "share", // Local folder which is being made available } /* DefaultTreeConfig is the default configuration for a tree which imports branches */ var DefaultTreeConfig = map[string]interface{}{ TreeSecret: "", // Secret needs to be provided by the client } // Helper functions // ================ /* CheckBranchExportConfig checks a given branch export config. */ func CheckBranchExportConfig(config map[string]interface{}) error { for k := range DefaultBranchExportConfig { if _, ok := config[k]; !ok { return fmt.Errorf("Missing %v key in branch export config", k) } } return nil } /* CheckTreeConfig checks a given tree config. */ func CheckTreeConfig(config map[string]interface{}) error { for k := range DefaultTreeConfig { if _, ok := config[k]; !ok { return fmt.Errorf("Missing %v key in tree config", k) } } return nil }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package rufs import ( "fmt" "os" "path/filepath" "time" ) /* Special unit test flag - use a common mode to gloss over OS specific defaults */ var unitTestModes = false /* FileInfo implements os.FileInfo in an platform-agnostic way */ type FileInfo struct { FiName string // Base name FiSize int64 // Size in bytes FiMode os.FileMode // File mode bits FiModTime time.Time // Modification time FiChecksum string // Checksum of files // Private fields which will not be transferred via RPC isSymLink bool // Flag if this is a symlink (unix) symLinkTarget string // Target file/directory of the symlink } /* WrapFileInfo wraps a single os.FileInfo object in a serializable FileInfo. */ func WrapFileInfo(path string, i os.FileInfo) os.FileInfo { var realPath string // Check if we have a symlink mode := i.Mode() size := i.Size() isSymlink := i.Mode()&os.ModeSymlink != 0 if isSymlink { var err error if realPath, err = filepath.EvalSymlinks(filepath.Join(path, i.Name())); err == nil { var ri os.FileInfo if ri, err = os.Stat(realPath); err == nil { // Write in the size of the target and file mode mode = ri.Mode() size = ri.Size() } } } // Unit test fixed file modes if unitTestModes { mode = mode & os.ModeDir if mode.IsDir() { mode = mode | 0777 size = 4096 } else { mode = mode | 0666 } } return &FileInfo{i.Name(), size, mode, i.ModTime(), "", isSymlink, realPath} } /* WrapFileInfos wraps a list of os.FileInfo objects into a list of serializable FileInfo objects. This function will modify the given list. */ func WrapFileInfos(path string, is []os.FileInfo) []os.FileInfo { for i, info := range is { is[i] = WrapFileInfo(path, info) } return is } /* Name returns the base name. */ func (rfi *FileInfo) Name() string { return rfi.FiName } /* Size returns the length in bytes. */ func (rfi *FileInfo) Size() int64 { return rfi.FiSize } /* Mode returns the file mode bits. */ func (rfi *FileInfo) Mode() os.FileMode { return rfi.FiMode } /* ModTime returns the modification time. */ func (rfi *FileInfo) ModTime() time.Time { return rfi.FiModTime } /* Checksum returns the checksum of this file. May be an empty string if it was not calculated. */ func (rfi *FileInfo) Checksum() string { return rfi.FiChecksum } /* IsDir returns if this is a directory. */ func (rfi *FileInfo) IsDir() bool { return rfi.FiMode.IsDir() } /* Sys should return the underlying data source but will always return nil for FileInfo nodes. */ func (rfi *FileInfo) Sys() interface{} { return nil } func (rfi *FileInfo) String() string { sum := rfi.Checksum() if sum != "" { return fmt.Sprintf("%v %s [%v] %v (%v) - %v", rfi.Name(), sum, rfi.Size(), rfi.Mode(), rfi.ModTime(), rfi.Sys()) } return fmt.Sprintf("%v [%v] %v (%v) - %v", rfi.Name(), rfi.Size(), rfi.Mode(), rfi.ModTime(), rfi.Sys()) }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ /* Package rumble contains Rumble functions which interface with Rufs. */ package rumble import ( "fmt" "os" "regexp" "devt.de/krotik/common/defs/rumble" "devt.de/krotik/common/stringutil" "devt.de/krotik/rufs/api" ) // Function: dir // ============= /* DirFunc queries a directory in a tree. */ type DirFunc struct { } /* Name returns the name of the function. */ func (f *DirFunc) Name() string { return "fs.dir" } /* Validate is called for parameter validation and to reset the function state. */ func (f *DirFunc) Validate(argsNum int, rt rumble.Runtime) rumble.RuntimeError { var err rumble.RuntimeError if argsNum != 3 && argsNum != 4 { err = rt.NewRuntimeError(rumble.ErrInvalidConstruct, "Function dir requires 3 or 4 parameters: tree, a path, a glob expression and optionally a recursive flag") } return err } /* Execute executes the rumble function. */ func (f *DirFunc) Execute(argsVal []interface{}, vars rumble.Variables, rt rumble.Runtime) (interface{}, rumble.RuntimeError) { var res interface{} var paths []string var fiList [][]os.FileInfo treeName := fmt.Sprint(argsVal[0]) path := fmt.Sprint(argsVal[1]) pattern := fmt.Sprint(argsVal[2]) recursive := argsVal[3] == true conv := func(re *regexp.Regexp, fis []os.FileInfo) []interface{} { r := make([]interface{}, 0, len(fis)) for _, fi := range fis { if !fi.IsDir() && !re.MatchString(fi.Name()) { continue } r = append(r, map[interface{}]interface{}{ "name": fi.Name(), "mode": fmt.Sprint(fi.Mode()), "modtime": fmt.Sprint(fi.ModTime()), "isdir": fi.IsDir(), "size": fi.Size(), }) } return r } tree, ok, err := api.GetTree(treeName) if !ok { if err == nil { err = fmt.Errorf("Unknown tree: %v", treeName) } } if err == nil { var globPattern string // Create regex for files if globPattern, err = stringutil.GlobToRegex(pattern); err == nil { var re *regexp.Regexp if re, err = regexp.Compile(globPattern); err == nil { // Query the file system paths, fiList, err = tree.Dir(path, "", recursive, false) pathData := make([]interface{}, 0, len(paths)) fisData := make([]interface{}, 0, len(paths)) // Convert the result into a Rumble data structure for i := range paths { fis := conv(re, fiList[i]) // If we have a regex then only include directories which have files pathData = append(pathData, paths[i]) fisData = append(fisData, fis) } res = []interface{}{pathData, fisData} } } } if err != nil { // Wrap error message in RuntimeError err = rt.NewRuntimeError(rumble.ErrInvalidState, fmt.Sprintf("Cannot list files: %v", err.Error())) } return res, err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ /* Package node contains the network communication code for Rufs via RPC calls. */ package node import ( "crypto/tls" "encoding/gob" "fmt" "io" "net" "net/rpc" "os" "sort" "strings" "sync" "time" ) func init() { // Make sure we can use the relevant types in a gob operation gob.Register(&RufsNodeToken{}) gob.Register(map[string]string{}) } /* DialTimeout is the dial timeout for RPC connections */ var DialTimeout = 10 * time.Second /* RufsNodeToken is used to authenticate a node in the network to other nodes */ type RufsNodeToken struct { NodeName string NodeAuth string } /* Client is the client for the RPC API of a node. */ type Client struct { token *RufsNodeToken // Token to be send to other nodes for authentication rpc string // This client's rpc network interface (may be empty in case of pure clients) peers map[string]string // Map of node names to their rpc network interface conns map[string]*rpc.Client // Map of node names to network connections fingerprints map[string]string // Map of expected server certificate fingerprints cert *tls.Certificate // Client certificate maplock *sync.RWMutex // Lock for maps redial bool // Flag if this client is attempting a redial } /* SSLFingerprint returns the SSL fingerprint of the client. */ func (c *Client) SSLFingerprint() string { var ret string if c.cert != nil && c.cert.Certificate[0] != nil { ret = fingerprint(c.cert.Certificate[0]) } return ret } /* Shutdown closes all stored connections. */ func (c *Client) Shutdown() { c.maplock.Lock() defer c.maplock.Unlock() for _, c := range c.conns { c.Close() } c.conns = make(map[string]*rpc.Client) } /* RegisterPeer registers a new peer to communicate with. An empty fingerprint means that the client will accept any certificate from the server. */ func (c *Client) RegisterPeer(node string, rpc string, fingerprint string) error { if _, ok := c.peers[node]; ok { return fmt.Errorf("Peer already registered: %v", node) } else if rpc == "" { return fmt.Errorf("RPC interface must not be empty") } c.maplock.Lock() c.peers[node] = rpc delete(c.conns, node) c.fingerprints[node] = fingerprint c.maplock.Unlock() return nil } /* Peers returns all registered peers and their expected fingerprints. */ func (c *Client) Peers() ([]string, []string) { ret := make([]string, 0, len(c.peers)) fps := make([]string, len(c.peers)) c.maplock.Lock() defer c.maplock.Unlock() for k := range c.peers { ret = append(ret, k) } sort.Strings(ret) for i, node := range ret { fps[i] = c.fingerprints[node] } return ret, fps } /* RemovePeer removes a registered peer. */ func (c *Client) RemovePeer(node string) { c.maplock.Lock() delete(c.peers, node) delete(c.conns, node) delete(c.fingerprints, node) c.maplock.Unlock() } /* SendPing sends a ping to a node and returns the result. Second argument is optional if the target member is not a known peer. Should be an empty string in all other cases. Returns the answer, the fingerprint of the presented server certificate and any errors. */ func (c *Client) SendPing(node string, rpc string) ([]string, string, error) { var ret []string var fp string if _, ok := c.peers[node]; !ok && rpc != "" { // Add member temporary if it was not registered c.maplock.Lock() c.peers[node] = rpc c.maplock.Unlock() defer func() { c.maplock.Lock() delete(c.peers, node) delete(c.conns, node) delete(c.fingerprints, node) c.maplock.Unlock() }() } res, err := c.SendRequest(node, RPCPing, nil) if res != nil && err == nil { ret = res.([]string) c.maplock.Lock() fp = c.fingerprints[node] c.maplock.Unlock() } return ret, fp, err } /* SendData sends a portion of data and some control information to a node and returns the result. */ func (c *Client) SendData(node string, ctrl map[string]string, data []byte) ([]byte, error) { if _, ok := c.peers[node]; !ok { return nil, fmt.Errorf("Unknown peer: %v", node) } res, err := c.SendRequest(node, RPCData, map[RequestArgument]interface{}{ RequestCTRL: ctrl, RequestDATA: data, }) if res != nil { return res.([]byte), err } return nil, err } /* SendRequest sends a request to another node. */ func (c *Client) SendRequest(node string, remoteCall RPCFunction, args map[RequestArgument]interface{}) (interface{}, error) { var err error // Function to categorize errors handleError := func(err error) error { if _, ok := err.(net.Error); ok { return &Error{ErrNodeComm, err.Error(), false} } // Wrap remote errors in a proper error object if err != nil && !strings.HasPrefix(err.Error(), "RufsError: ") { // Check if the error is known to report that a file or directory // does not exist. err = &Error{ErrRemoteAction, err.Error(), err.Error() == os.ErrNotExist.Error()} } return err } c.maplock.Lock() laddr, ok := c.peers[node] c.maplock.Unlock() if ok { // Get network connection to the node c.maplock.Lock() conn, ok := c.conns[node] c.maplock.Unlock() if !ok { // Create a new connection if necessary nconn, err := net.DialTimeout("tcp", laddr, DialTimeout) if err != nil { LogDebug(c.token.NodeName, ": ", fmt.Sprintf("- %v.%v (laddr=%v err=%v)", node, remoteCall, laddr, err)) return nil, handleError(err) } if c.cert != nil && c.cert.Certificate[0] != nil { // Wrap the conn in a TLS client config := tls.Config{ Certificates: []tls.Certificate{*c.cert}, InsecureSkipVerify: true, } tlsconn := tls.Client(nconn, &config) // Do the handshake and look at the server certificate tlsconn.Handshake() rfp := fingerprint(tlsconn.ConnectionState().PeerCertificates[0].Raw) c.maplock.Lock() expected, _ := c.fingerprints[node] c.maplock.Unlock() if expected == "" { // Accept the certificate and store it c.maplock.Lock() c.fingerprints[node] = rfp c.maplock.Unlock() } else if expected != rfp { // Fingerprint was NOT verified LogDebug(c.token.NodeName, ": ", fmt.Sprintf("Not trusting %v (laddr=%v) presented fingerprint: %v expected fingerprint: %v", node, laddr, rfp, expected)) return nil, &Error{ErrUntrustedTarget, node, false} } LogDebug(c.token.NodeName, ": ", fmt.Sprintf("%v (laddr=%v) has SSL fingerprint %v ", node, laddr, rfp)) nconn = tlsconn } conn = rpc.NewClient(nconn) // Store the connection so it can be reused c.maplock.Lock() c.conns[node] = conn c.maplock.Unlock() } // Assemble the request request := map[RequestArgument]interface{}{ RequestTARGET: node, RequestTOKEN: c.token, } if args != nil { for k, v := range args { request[k] = v } } var response interface{} LogDebug(c.token.NodeName, ": ", fmt.Sprintf("> %v.%v (laddr=%v)", node, remoteCall, laddr)) err = conn.Call("RufsServer."+string(remoteCall), request, &response) if !c.redial && (err == rpc.ErrShutdown || err == io.EOF || err == io.ErrUnexpectedEOF) { // Delete the closed connection and retry the request c.maplock.Lock() delete(c.conns, node) c.redial = true // Set the redial flag to avoid a forever loop c.maplock.Unlock() return c.SendRequest(node, remoteCall, args) } // Reset redial flag c.maplock.Lock() c.redial = false c.maplock.Unlock() LogDebug(c.token.NodeName, ": ", fmt.Sprintf("< %v.%v (err=%v)", node, remoteCall, err)) return response, handleError(err) } return nil, &Error{ErrUnknownTarget, node, false} }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package node import ( "errors" "fmt" "log" ) // Logging // ======= /* Logger is a function which processes log messages */ type Logger func(v ...interface{}) /* LogInfo is called if an info message is logged */ var LogInfo = Logger(log.Print) /* LogDebug is called if a debug message is logged (by default disabled) */ var LogDebug = Logger(LogNull) /* LogNull is a discarding logger to be used for disabling loggers */ var LogNull = func(v ...interface{}) { } // Errors // ====== /* Error is a network related error */ type Error struct { Type error // Error type (to be used for equal checks) Detail string // Details of this error IsNotExist bool // Error is file or directory does not exist } /* Error returns a human-readable string representation of this error. */ func (ge *Error) Error() string { if ge.Detail != "" { return fmt.Sprintf("RufsError: %v (%v)", ge.Type, ge.Detail) } return fmt.Sprintf("RufsError: %v", ge.Type) } /* Network related error types */ var ( ErrNodeComm = errors.New("Network error") ErrRemoteAction = errors.New("Remote error") ErrUnknownTarget = errors.New("Unknown target node") ErrUntrustedTarget = errors.New("Unexpected SSL certificate from target node") ErrInvalidToken = errors.New("Invalid node token") )
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package node import ( "crypto/sha512" "crypto/tls" "fmt" "net" "net/rpc" "sync" ) /* RequestHandler is a function to handle incoming requests. A request has a control object which contains information on what the data is and how it should be used and the data itself. The request handler should return the result or an error. */ type RequestHandler func(ctrl map[string]string, data []byte) ([]byte, error) /* RufsNode is the management object for a node in the Rufs network. A RufsNode registers itself to the rpc server which is the global server object. Each node needs to have a unique name. Communication between nodes is secured by using a secret string which is never exchanged over the network and a hash generated token which identifies a member. Each RufsNode object contains a Client object which can be used to communicate with other nodes. This object should be used by pure clients - code which should communicate with the cluster without running an actual member. */ type RufsNode struct { name string // Name of the node secret string // Network wide secret Client *Client // RPC client object listener net.Listener // RPC server listener wg sync.WaitGroup // RPC server Waitgroup for listener shutdown DataHandler RequestHandler // Handler function for data requests cert *tls.Certificate // Node certificate } /* NewNode create a new RufsNode object. */ func NewNode(rpcInterface string, name string, secret string, clientCert *tls.Certificate, dataHandler RequestHandler) *RufsNode { // Generate node token token := &RufsNodeToken{name, fmt.Sprintf("%X", sha512.Sum512_224([]byte(name+secret)))} rn := &RufsNode{name, secret, &Client{token, rpcInterface, make(map[string]string), make(map[string]*rpc.Client), make(map[string]string), clientCert, &sync.RWMutex{}, false}, nil, sync.WaitGroup{}, dataHandler, clientCert} return rn } /* NewClient create a new Client object. */ func NewClient(secret string, clientCert *tls.Certificate) *Client { return NewNode("", "", secret, clientCert, nil).Client } // General node API // ================ /* Name returns the name of the node. */ func (rn *RufsNode) Name() string { return rn.name } /* SSLFingerprint returns the SSL fingerprint of the node. */ func (rn *RufsNode) SSLFingerprint() string { var ret string if rn.cert != nil && rn.cert.Certificate[0] != nil { ret = fingerprint(rn.cert.Certificate[0]) } return ret } /* LogInfo logs a node related message at info level. */ func (rn *RufsNode) LogInfo(v ...interface{}) { LogInfo(rn.name, ": ", fmt.Sprint(v...)) } /* Start starts process for this node. */ func (rn *RufsNode) Start(serverCert *tls.Certificate) error { if _, ok := rufsServer.nodes[rn.name]; ok { return fmt.Errorf("Cannot start node %s twice", rn.name) } rn.LogInfo("Starting node ", rn.name, " rpc server on: ", rn.Client.rpc) l, err := net.Listen("tcp", rn.Client.rpc) if err != nil { return err } if serverCert != nil && serverCert.Certificate[0] != nil { rn.cert = serverCert rn.LogInfo("SSL fingerprint: ", rn.SSLFingerprint()) // Wrap the listener in a TLS listener config := tls.Config{Certificates: []tls.Certificate{*serverCert}} l = tls.NewListener(l, &config) } // Kick of the rpc listener go func() { rpc.Accept(l) rn.wg.Done() rn.LogInfo("Connection closed: ", rn.Client.rpc) }() rn.listener = l // Register this node in the global server map rufsServer.nodes[rn.name] = rn return nil } /* Shutdown shuts the member manager rpc server for this cluster member down. */ func (rn *RufsNode) Shutdown() error { var err error // Close socket if rn.listener != nil { rn.LogInfo("Shutdown rpc server on: ", rn.Client.rpc) rn.wg.Add(1) err = rn.listener.Close() rn.Client.Shutdown() rn.listener = nil rn.wg.Wait() delete(rufsServer.nodes, rn.name) } else { LogDebug("Node ", rn.name, " already shut down") } return err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package node import ( "bytes" "crypto/sha256" "crypto/sha512" "fmt" "net/rpc" "devt.de/krotik/common/errorutil" ) func init() { // Create singleton Server instance. rufsServer = &RufsServer{make(map[string]*RufsNode)} // Register the cluster API as RPC server errorutil.AssertOk(rpc.Register(rufsServer)) } /* RPCFunction is used to identify the called function in a RPC call */ type RPCFunction string /* List of all possible RPC functions. The list includes all RPC callable functions in this file. */ const ( // General functions RPCPing RPCFunction = "Ping" RPCData RPCFunction = "Data" ) /* RequestArgument is used to identify arguments in a RPC call */ type RequestArgument int /* List of all possible arguments in a RPC request. There are usually no checks which give back an error if a required argument is missing. The RPC API is an internal API and might change without backwards compatibility. */ const ( // General arguments RequestTARGET RequestArgument = iota // Required argument which identifies the target node RequestTOKEN // Client token which is used for authorization checks RequestCTRL // Control object (i.e. what to do with the data) RequestDATA // Data object ) /* rufsServer is the Server instance which serves rpc calls */ var rufsServer *RufsServer /* RufsServer is the RPC exposed Rufs API of a machine. Server is a singleton and will route incoming (authenticated) requests to registered RufsNodes. The calling node is referred to as source node and the called node is referred to as target node. */ type RufsServer struct { nodes map[string]*RufsNode // Map of local RufsNodes } // General functions // ================= /* Ping answers with a Pong if the given client token was verified and the local node exists. */ func (s *RufsServer) Ping(request map[RequestArgument]interface{}, response *interface{}) error { // Verify the given token and retrieve the target member if _, err := s.checkToken(request); err != nil { return err } // Send a simple response res := []string{"Pong"} *response = res return nil } /* Data handles data requests. */ func (s *RufsServer) Data(request map[RequestArgument]interface{}, response *interface{}) error { // Verify the given token and retrieve the target member node, err := s.checkToken(request) if err != nil || node.DataHandler == nil { return err } // Forward to the registered data handler res, err := node.DataHandler(request[RequestCTRL].(map[string]string), request[RequestDATA].([]byte)) if err == nil { *response = res } return err } // Helper functions // ================ /* checkToken checks the member token in a given request. */ func (s *RufsServer) checkToken(request map[RequestArgument]interface{}) (*RufsNode, error) { err := ErrUnknownTarget // Get the target member target := request[RequestTARGET].(string) token := request[RequestTOKEN].(*RufsNodeToken) if node, ok := s.nodes[target]; ok { err = ErrInvalidToken // Generate expected auth from given requesting node name in token and secret of target expectedAuth := fmt.Sprintf("%X", sha512.Sum512_224([]byte(token.NodeName+node.secret))) if token.NodeAuth == expectedAuth { return node, nil } } return nil, err } /* fingerprint converts a given set of bytes to a fingerprint. */ func fingerprint(b []byte) string { var buf bytes.Buffer hs := fmt.Sprintf("%x", sha256.Sum256(b)) for i, c := range hs { buf.WriteByte(byte(c)) if (i+1)%2 == 0 && i != len(hs)-1 { buf.WriteByte(byte(':')) } } return buf.String() }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package term import ( "fmt" "os" "devt.de/krotik/common/stringutil" "devt.de/krotik/rufs" ) /* cmdCd show or change the current directory. */ func cmdCd(tt *TreeTerm, arg ...string) (string, error) { if len(arg) > 0 { tt.cd = tt.parsePathParam(arg[0]) } return fmt.Sprint(tt.cd, "\n"), nil } /* cmdDir shows a directory listing. */ func cmdDir(tt *TreeTerm, arg ...string) (string, error) { return cmdDirListing(tt, false, false, arg...) } /* cmdChecksum shows a directory listing and the checksums. */ func cmdChecksum(tt *TreeTerm, arg ...string) (string, error) { return cmdDirListing(tt, false, true, arg...) } /* cmdTree shows the listing of a directory and its subdirectorie */ func cmdTree(tt *TreeTerm, arg ...string) (string, error) { return cmdDirListing(tt, true, false, arg...) } /* cmdDirListing shows a directory listing and optional also its subdirectories. */ func cmdDirListing(tt *TreeTerm, recursive bool, checksum bool, arg ...string) (string, error) { var dirs []string var fis [][]os.FileInfo var err error var res, rex string dir := tt.cd if len(arg) > 0 { dir = tt.parsePathParam(arg[0]) if len(arg) > 1 { rex, err = stringutil.GlobToRegex(arg[1]) } } if err == nil { if dirs, fis, err = tt.tree.Dir(dir, rex, recursive, checksum); err == nil { res = rufs.DirResultToString(dirs, fis) } } return res, err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package term import ( "fmt" "os" "path" "path/filepath" "devt.de/krotik/common/bitutil" "devt.de/krotik/rufs" ) /* cmdCat reads and prints the contents of a file. */ func cmdCat(tt *TreeTerm, arg ...string) (string, error) { err := fmt.Errorf("cat requires a file path") if len(arg) > 0 { err = tt.tree.ReadFileToBuffer(tt.parsePathParam(arg[0]), tt.out) } return "", err } /* cmdGet Retrieve a file and store it locally (in the current directory). */ func cmdGet(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("get requires at least a source file path") if lenArg > 0 { var f *os.File src := tt.parsePathParam(arg[0]) dst := src if lenArg > 1 { dst = tt.parsePathParam(arg[1]) } // Make sure we only write files to the local folder _, dst = filepath.Split(dst) if f, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660); err == nil { defer f.Close() if err = tt.tree.ReadFileToBuffer(src, f); err == nil { res = fmt.Sprintf("Written file %s", dst) } } } return res, err } /* cmdPut Read a local file and store it. */ func cmdPut(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("put requires a source and destination file path") if lenArg > 0 { var f *os.File src := arg[0] dst := tt.parsePathParam(arg[1]) if f, err = os.Open(src); err == nil { defer f.Close() if err = tt.tree.WriteFileFromBuffer(dst, f); err == nil { res = fmt.Sprintf("Written file %s", dst) } } } return res, err } /* cmdRm Delete a file or directory. */ func cmdRm(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("rm requires a file path") if lenArg > 0 { p := tt.parsePathParam(arg[0]) dir, file := path.Split(p) if file == "" { // If a path is give just chop off the last slash and try again dir, file = path.Split(dir[:len(dir)-1]) } _, err = tt.tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActDelete, rufs.ItemOpName: file, }) } return res, err } /* cmdRen Rename a file or directory. */ func cmdRen(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("ren requires a filename and a new filename") if lenArg > 1 { p := tt.parsePathParam(arg[0]) p2 := tt.parsePathParam(arg[1]) dir1, file1 := path.Split(p) dir2, file2 := path.Split(p2) if file2 == "" || dir2 != "/" { err = fmt.Errorf("new filename must not have a path") } else { if file1 == "" { // If a path is give just chop off the last slash and try again dir1, file1 = path.Split(dir1[:len(dir1)-1]) } _, err = tt.tree.ItemOp(dir1, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActRename, rufs.ItemOpName: file1, rufs.ItemOpNewName: file2, }) } } return res, err } /* cmdMkDir Create a new direectory. */ func cmdMkDir(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("mkdir requires a directory path") if lenArg > 0 { p := tt.parsePathParam(arg[0]) dir, newdir := path.Split(p) if newdir == "" { // If a path is given just chop off the last slash and try again dir, newdir = path.Split(dir[:len(dir)-1]) } _, err = tt.tree.ItemOp(dir, map[string]string{ rufs.ItemOpAction: rufs.ItemOpActMkDir, rufs.ItemOpName: newdir, }) } return res, err } /* cmdCp Copy a file. */ func cmdCp(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("cp requires a source file or directory and a destination directory") if lenArg > 1 { src := tt.parsePathParam(arg[0]) dst := tt.parsePathParam(arg[1]) updFunc := func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64) { if writtenBytes > 0 { tt.WriteStatus(fmt.Sprintf("Copy %v: %v / %v (%v of %v)", file, bitutil.ByteSizeString(writtenBytes, false), bitutil.ByteSizeString(totalBytes, false), currentFile, totalFiles)) } else { tt.ClearStatus() } } if err = tt.tree.Copy([]string{src}, dst, updFunc); err == nil { res = "Done" } } return res, err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package term import ( "bytes" "fmt" "sort" "unicode/utf8" "devt.de/krotik/common/stringutil" ) /* cmdHelp executes the help command. */ func cmdHelp(tt *TreeTerm, arg ...string) (string, error) { var res bytes.Buffer if len(arg) == 0 { var maxlen = 0 cmds := make([]string, 0, len(helpMap)) res.WriteString("Available commands:\n") res.WriteString("----\n") for c := range helpMap { if cc := utf8.RuneCountInString(c); cc > maxlen { maxlen = cc } cmds = append(cmds, c) } sort.Strings(cmds) for _, c := range cmds { cc := utf8.RuneCountInString(c) spacer := stringutil.GenerateRollingString(" ", maxlen-cc) res.WriteString(fmt.Sprintf("%v%v : %v\n", c, spacer, helpMap[c])) } } return res.String(), nil }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package term import ( "bytes" "fmt" ) /* cmdReset removes all present mount points or branches. */ func cmdReset(tt *TreeTerm, arg ...string) (string, error) { if len(arg) > 0 { if arg[0] == "mounts" { tt.tree.Reset(false) return "Resetting all mounts\n", nil } else if arg[0] == "branches" { tt.tree.Reset(true) return "Resetting all branches and mounts\n", nil } } return "", fmt.Errorf("Can either reset all [mounts] or all [branches] which includes all mount points") } /* cmdBranch lists all known branches or adds a new branch to the tree. */ func cmdBranch(tt *TreeTerm, arg ...string) (string, error) { var err error var res bytes.Buffer writeKnownBranches := func() { braches, fps := tt.tree.ActiveBranches() for i, b := range braches { res.WriteString(fmt.Sprintf("%v [%v]\n", b, fps[i])) } } if len(arg) == 0 { writeKnownBranches() } else if len(arg) > 1 { var fp = "" branchName := arg[0] branchRPC := arg[1] if len(arg) > 2 { fp = arg[2] } err = tt.tree.AddBranch(branchName, branchRPC, fp) writeKnownBranches() } else { err = fmt.Errorf("branch requires either no or at least 2 parameters") } return res.String(), err } /* cmdMount lists all mount points or adds a new mount point to the tree. */ func cmdMount(tt *TreeTerm, arg ...string) (string, error) { var err error var res bytes.Buffer if len(arg) == 0 { res.WriteString(tt.tree.String()) } else if len(arg) > 1 { dir := arg[0] branchName := arg[1] writable := !(len(arg) > 2 && arg[2] == "ro") // Writeable unless stated otherwise if err = tt.tree.AddMapping(dir, branchName, writable); err == nil { res.WriteString(tt.tree.String()) } } else { err = fmt.Errorf("mount requires either 2 or no parameters") } return res.String(), err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package term import ( "fmt" "devt.de/krotik/common/bitutil" ) /* cmdSync Make sure dst has the same files and directories as src. */ func cmdSync(tt *TreeTerm, arg ...string) (string, error) { var res string lenArg := len(arg) err := fmt.Errorf("sync requires a source and a destination directory") if lenArg > 1 { src := tt.parsePathParam(arg[0]) dst := tt.parsePathParam(arg[1]) updFunc := func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64) { if writtenBytes > 0 { tt.WriteStatus(fmt.Sprintf("%v (%v/%v) writing: %v -> %v %v / %v", op, currentFile, totalFiles, srcFile, dstFile, bitutil.ByteSizeString(writtenBytes, false), bitutil.ByteSizeString(totalBytes, false))) } else { tt.ClearStatus() fmt.Fprint(tt.out, fmt.Sprintln(fmt.Sprintf("%v (%v/%v) %v -> %v", op, currentFile, totalFiles, srcFile, dstFile))) } } if err = tt.tree.Sync(src, dst, true, updFunc); err == nil { res = "Done" } } return res, err }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package term import ( "fmt" "io" "path" "sort" "strings" "unicode/utf8" "devt.de/krotik/common/stringutil" "devt.de/krotik/rufs" ) /* TreeTerm models a command processor for Rufs trees. */ type TreeTerm struct { tree *rufs.Tree // Tree which we operate on cd string // Current directory out io.Writer // Output writer lastStatus string // Last status line } /* NewTreeTerm returns a new command processor for Rufs trees. */ func NewTreeTerm(t *rufs.Tree, out io.Writer) *TreeTerm { return &TreeTerm{t, "/", out, ""} } /* WriteStatus writes a status line to the output writer. */ func (tt *TreeTerm) WriteStatus(line string) { fmt.Fprint(tt.out, "\r") fmt.Fprint(tt.out, line) ll := len(tt.lastStatus) lc := len(line) if ll > lc { fmt.Fprint(tt.out, stringutil.GenerateRollingString(" ", ll-lc)) } tt.lastStatus = line } /* ClearStatus removes the last status line and returns the cursor to the initial position. */ func (tt *TreeTerm) ClearStatus() { if tt.lastStatus != "" { toClear := utf8.RuneCountInString(tt.lastStatus) fmt.Fprint(tt.out, "\r") fmt.Fprint(tt.out, stringutil.GenerateRollingString(" ", toClear)) fmt.Fprint(tt.out, "\r") } } /* CurrentDir returns the current directory of this TreeTerm. */ func (tt *TreeTerm) CurrentDir() string { return tt.cd } /* AddCmd adds a new command to the terminal */ func (tt *TreeTerm) AddCmd(cmd, helpusage, help string, cmdFunc func(*TreeTerm, ...string) (string, error)) { cmdMap[cmd] = cmdFunc helpMap[helpusage] = help } /* Cmds returns a list of available terminal commands. */ func (tt *TreeTerm) Cmds() []string { var cmds []string for k := range cmdMap { cmds = append(cmds, k) } sort.Strings(cmds) return cmds } /* Run executes a given command line. And return its output as a string. File output and other streams to the console are written to the output writer. */ func (tt *TreeTerm) Run(line string) (string, error) { var err error var res string var arg []string // Parse the input c := strings.Split(line, " ") cmd := c[0] if len(c) > 1 { arg = c[1:] } // Execute the given command if f, ok := cmdMap[cmd]; ok { res, err = f(tt, arg...) } else { err = fmt.Errorf("Unknown command: %s", cmd) } return res, err } /* cmdPing pings a remote branch. */ func cmdPing(tt *TreeTerm, arg ...string) (string, error) { var res string err := fmt.Errorf("ping requires at least a branch name") if len(arg) > 0 { var fp, rpc string if len(arg) > 1 { rpc = arg[1] } if fp, err = tt.tree.PingBranch(arg[0], rpc); err == nil { res = fmt.Sprint("Response ok - fingerprint: ", fp, "\n") } } return res, err } /* cmdRefresh refreshes all known branches and connects depending on if the branches are reachable. */ func cmdRefresh(tt *TreeTerm, arg ...string) (string, error) { tt.tree.Refresh() return "Done", nil } /* parsePathParam parse a given path parameter and return an absolute path. */ func (tt *TreeTerm) parsePathParam(p string) string { if !strings.HasPrefix(p, "/") { p = path.Join(tt.cd, p) // Take care of relative paths } return p }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package rufs import ( "bytes" "crypto/tls" "encoding/gob" "encoding/json" "fmt" "io" "os" "path" "regexp" "sort" "strings" "sync" "time" "unicode/utf8" "devt.de/krotik/common/bitutil" "devt.de/krotik/common/fileutil" "devt.de/krotik/common/stringutil" "devt.de/krotik/rufs/config" "devt.de/krotik/rufs/node" ) /* Tree models a Rufs client which combines several branches. */ type Tree struct { client *node.Client // RPC client treeLock *sync.RWMutex // Lock for maps root *treeItem // Tree root item branches []map[string]string // Added working branches branchesAll []map[string]string // All added branches also not working mapping []map[string]interface{} // Mappings from working branches mappingAll []map[string]interface{} // All used mappings } /* NewTree creates a new tree. */ func NewTree(cfg map[string]interface{}, cert *tls.Certificate) (*Tree, error) { var err error var t *Tree // Make sure the given config is ok if err = config.CheckTreeConfig(cfg); err == nil { // Create RPC client c := node.NewClient(fileutil.ConfStr(cfg, config.TreeSecret), cert) // Create the tree t = &Tree{c, &sync.RWMutex{}, &treeItem{make(map[string]*treeItem), []string{}, []bool{}}, []map[string]string{}, []map[string]string{}, []map[string]interface{}{}, []map[string]interface{}{}} } return t, err } /* Config returns the current tree configuration as a JSON string. */ func (t *Tree) Config() string { t.treeLock.RLock() defer t.treeLock.RUnlock() out, _ := json.MarshalIndent(map[string]interface{}{ "branches": t.branches, "tree": t.mapping, }, "", " ") return string(out) } /* SetMapping adds a given tree mapping configuration in a JSON string. */ func (t *Tree) SetMapping(config string) error { var err error var conf map[string][]map[string]interface{} // Unmarshal the config if err = json.Unmarshal([]byte(config), &conf); err == nil { // Reset the whole tree t.Reset(true) if branches, ok := conf["branches"]; ok { for _, b := range branches { t.AddBranch(b["branch"].(string), b["rpc"].(string), b["fingerprint"].(string)) } } if mounts, ok := conf["tree"]; ok { for _, m := range mounts { t.AddMapping(m["path"].(string), m["branch"].(string), m["writeable"].(bool)) } } } return err } /* KnownBranches returns a map of all known branches (active or not reachable). Caution: This map contains also the map of active branches with their fingerprints it should only be used for read operations. */ func (t *Tree) KnownBranches() map[string]map[string]string { ret := make(map[string]map[string]string) t.treeLock.RLock() t.treeLock.RUnlock() for _, b := range t.branchesAll { ret[b["branch"]] = b } return ret } /* ActiveBranches returns a list of all known active branches and their fingerprints. */ func (t *Tree) ActiveBranches() ([]string, []string) { return t.client.Peers() } /* NotReachableBranches returns a map of all known branches which couldn't be reached. The map contains the name and the definition of the branch. */ func (t *Tree) NotReachableBranches() map[string]map[string]string { ret := make(map[string]map[string]string) t.treeLock.RLock() t.treeLock.RUnlock() activeBranches := make(map[string]map[string]string) for _, b := range t.branches { activeBranches[b["branch"]] = b } for _, b := range t.branchesAll { name := b["branch"] if _, ok := activeBranches[name]; !ok { ret[name] = b } } return ret } /* PingBranch sends a ping to a remote branch and returns its fingerprint or an error. */ func (t *Tree) PingBranch(node string, rpc string) (string, error) { _, fp, err := t.client.SendPing(node, rpc) return fp, err } /* Reset either resets only all mounts or if the branches flag is specified also all known branches. */ func (t *Tree) Reset(branches bool) { if branches { peers, _ := t.client.Peers() for _, p := range peers { t.client.RemovePeer(p) } t.branches = []map[string]string{} t.branchesAll = []map[string]string{} } t.treeLock.Lock() defer t.treeLock.Unlock() t.mapping = []map[string]interface{}{} t.mappingAll = []map[string]interface{}{} t.root = &treeItem{make(map[string]*treeItem), []string{}, []bool{}} } /* Refresh refreshes all known branches and mappings. Only reachable branches will be mapped into the tree. */ func (t *Tree) Refresh() { addBranches := make(map[string]map[string]string) delBranches := make(map[string]map[string]string) nrBranches := t.NotReachableBranches() // Check all known branches and decide if they should be added or removed t.treeLock.RLock() for _, data := range t.branchesAll { branchName := data["branch"] branchRPC := data["rpc"] _, knownAsNotWorking := nrBranches[branchName] // Ping the branch _, _, err := t.client.SendPing(branchName, branchRPC) if err == nil && knownAsNotWorking { // Success branch can now be reached addBranches[branchName] = data } else if err != nil && !knownAsNotWorking { // Failure branch can no longer be reached delBranches[branchName] = data } } t.treeLock.RUnlock() // Now lock the tree and add/remove branches t.treeLock.Lock() for i, b := range t.branches { branchName := b["branch"] if _, ok := delBranches[branchName]; ok { t.client.RemovePeer(branchName) t.branches = append(t.branches[:i], t.branches[i+1:]...) } } for _, b := range addBranches { branchName := b["branch"] branchRPC := b["rpc"] branchFingerprint := b["fingerprint"] t.client.RegisterPeer(branchName, branchRPC, branchFingerprint) t.branches = append(t.branches, b) } // Rebuild all mappings mappings := t.mappingAll t.mapping = []map[string]interface{}{} t.mappingAll = []map[string]interface{}{} t.root = &treeItem{make(map[string]*treeItem), []string{}, []bool{}} t.treeLock.Unlock() for _, m := range mappings { t.AddMapping(fmt.Sprint(m["path"]), fmt.Sprint(m["branch"]), m["writeable"].(bool)) } } /* AddBranch adds a branch to the tree. */ func (t *Tree) AddBranch(branchName string, branchRPC string, branchFingerprint string) error { branchMap := map[string]string{ "branch": branchName, "rpc": branchRPC, "fingerprint": branchFingerprint, } t.branchesAll = append(t.branchesAll, branchMap) // First ping the branch and see if we get a response _, fp, err := t.client.SendPing(branchName, branchRPC) // Only add the branch as active if we've seen it if err == nil { if branchFingerprint != "" && branchFingerprint != fp { err = fmt.Errorf("Remote branch has an unexpected fingerprint\nPresented fingerprint: %s\nExpected fingerprint : %s", branchFingerprint, fp) } else { t.treeLock.Lock() defer t.treeLock.Unlock() if err = t.client.RegisterPeer(branchName, branchRPC, fp); err == nil { // Once we know and accepted the fingerprint we change it // // Remote branches will never change their fingerprint // during a single network session branchMap["fingerprint"] = fp t.branches = append(t.branches, branchMap) // Store the added branch } } } return err } /* AddMapping adds a mapping from tree path to a branch. */ func (t *Tree) AddMapping(dir, branchName string, writable bool) error { t.treeLock.Lock() defer t.treeLock.Unlock() err := node.ErrUnknownTarget mappingMap := map[string]interface{}{ "path": dir, "branch": branchName, "writeable": writable, } t.mappingAll = append(t.mappingAll, mappingMap) peers, _ := t.client.Peers() for _, p := range peers { if p == branchName { // Split the given path and add the mapping t.root.addMapping(createMappingPath(dir), branchName, writable) t.mapping = append(t.mapping, mappingMap) err = nil } } return err } /* String returns a string representation of this tree. */ func (t *Tree) String() string { if t.treeLock != nil { t.treeLock.RLock() defer t.treeLock.RUnlock() } var buf bytes.Buffer buf.WriteString("/: ") if t != nil && t.root != nil { t.root.String(1, &buf) } return buf.String() } // Client API // ========== /* Dir returns file listings matching a given pattern of one or more directories. The contents of the given path is returned. Optionally, also the contents of all subdirectories can be returned if the recursive flag is set. The return values is a list of traversed directories and their corresponding contents. */ func (t *Tree) Dir(dir string, pattern string, recursive bool, checksums bool) ([]string, [][]os.FileInfo, error) { var err error var dirs []string var fis [][]os.FileInfo // Compile pattern re, err := regexp.Compile(pattern) if err != nil { return nil, nil, err } t.treeLock.RLock() defer t.treeLock.RUnlock() // Stip off trailing slashes to normalize the input if strings.HasSuffix(dir, "/") { dir = dir[:len(dir)-1] } treeVisitor := func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) { for _, b := range branches { var res []byte if err == nil { res, err = t.client.SendData(b, map[string]string{ ParamAction: OpDir, ParamPath: path.Join(branchPath...), ParamPattern: fmt.Sprint(pattern), ParamRecursive: fmt.Sprint(recursive), ParamChecksums: fmt.Sprint(checksums), }, nil) if err == nil { var dest []interface{} // Unpack the result if err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&dest); err == nil { bdirs := dest[0].([]string) bfis := dest[1].([][]os.FileInfo) // Construct the actual tree path for the returned directories for i, d := range bdirs { bdirs[i] = path.Join(treePath, d) // Merge these results into the overall results found := false for j, dir := range dirs { // Check if a directory from the result is already // in the overall result if dir == bdirs[i] { found = true // Create a map of existing names to avoid duplicates existing := make(map[string]bool) for _, fi := range fis[j] { existing[fi.Name()] = true } // Only add new files to the overall result for _, fi := range bfis[i] { if _, ok := existing[fi.Name()]; !ok { fis[j] = append(fis[j], fi) } } } } if !found { // Just append if the directory is not in the // overall results yet dirs = append(dirs, bdirs[i]) fis = append(fis, bfis[i]) } } } } } } } t.root.findPathBranches("/", createMappingPath(dir), recursive, treeVisitor) // Add pseudo directories for mapping components which have no corresponding // real directories dirsMap := make(map[string]int) for i, d := range dirs { dirsMap[d] = i } t.root.findPathBranches("/", createMappingPath(dir), recursive, func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) { if !strings.HasPrefix(treePath, dir) { return } idx, ok := dirsMap[treePath] if !ok { // Create the entry if it does not exist dirs = append(dirs, treePath) idx = len(dirs) - 1 dirsMap[treePath] = idx fis = append(fis, []os.FileInfo{}) } // Add pseudo dirs if a physical directory is not present for n := range item.children { found := false for _, fi := range fis[idx] { if fi.Name() == n { found = true break } } if found { continue } if re.MatchString(n) { // Append if it matches the pattern fis[idx] = append(fis[idx], &FileInfo{ FiName: n, FiSize: 0, FiMode: os.FileMode(os.ModeDir | 0777), FiModTime: time.Time{}, }) } } }) return dirs, fis, err } /* Stat returns information about a given item. Use this function to find out if a given path is a file or directory. */ func (t *Tree) Stat(item string) (os.FileInfo, error) { dir, file := path.Split(item) _, fis, err := t.Dir(dir, file, false, true) if len(fis) == 1 { for _, fi := range fis[0] { if fi.Name() == file { return fi, err } } } if err == nil { err = &node.Error{ Type: node.ErrRemoteAction, Detail: os.ErrNotExist.Error(), IsNotExist: true, } } return nil, err } /* Copy is a general purpose copy function which creates files and directories. Destination must be a directory. A non-existing destination directory will be created. */ func (t *Tree) Copy(src []string, dst string, updFunc func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64)) error { var err error var relPaths []string files := make(map[string]os.FileInfo) // Make sure any file is only copied once paths := make(map[string]string) // Add files to be copied to items for _, s := range src { var fi os.FileInfo fi, err = t.Stat(s) if fi, err = t.Stat(s); fi != nil { if fi.IsDir() { // Find all files inside directories if dirs, fis, err := t.Dir(s, "", true, false); err == nil { for i, d := range dirs { for _, fi2 := range fis[i] { if !fi2.IsDir() { // Calculate the relative path by removing // source path from the absolute path relPath := path.Join(d, fi2.Name())[len(s):] relPath = path.Join("/"+fi.Name(), relPath) relPaths = append(relPaths, relPath) files[relPath] = fi2 paths[relPath] = path.Join(d, fi2.Name()) } } } } } else { // Single files are just added - these files will always // be at the root of the destination relPath := "/" + fi.Name() relPaths = append(relPaths, relPath) files[relPath] = fi paths[relPath] = s } } if err != nil { err = fmt.Errorf("Cannot stat %v: %v", s, err.Error()) break } } if err == nil { var allFiles, cnt int64 // Copy all found files allFiles = int64(len(files)) for _, k := range relPaths { var totalSize, totalTransferred int64 cnt++ fi := files[k] totalSize = fi.Size() srcFile := paths[k] err = t.CopyFile(srcFile, path.Join(dst, k), func(b int) { if b >= 0 { totalTransferred += int64(b) updFunc(k, totalTransferred, totalSize, cnt, allFiles) } else { updFunc(k, int64(b), totalSize, cnt, allFiles) } }) if err != nil { err = fmt.Errorf("Cannot copy %v to %v: %v", srcFile, dst, err.Error()) break } } } return err } /* Sync operations */ const ( SyncCreateDirectory = "Create directory" SyncCopyFile = "Copy file" SyncRemoveDirectory = "Remove directory" SyncRemoveFile = "Remove file" ) /* Sync a given destination with a given source directory. After this command has finished the dstDir will have the same files and directories as the srcDir. */ func (t *Tree) Sync(srcDir string, dstDir string, recursive bool, updFunc func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64)) error { var currentFile, totalFiles int64 t.treeLock.RLock() defer t.treeLock.RUnlock() // doSync syncs a given src directory doSync := func(dir string, finfos []os.FileInfo) error { sdir := path.Join(srcDir, dir) ddir := path.Join(dstDir, dir) // Query the corresponding destination to see what is there _, dstFis, err := t.Dir(ddir, "", false, true) if err == nil { fileMap := make(map[string]string) // Map to quickly lookup destination files dirMap := make(map[string]bool) // Map to quickly lookup destination directories if len(dstFis) > 0 { for _, fi := range dstFis[0] { if fi.IsDir() { dirMap[fi.Name()] = true } else { fileMap[fi.Name()] = fi.(*FileInfo).Checksum() } } } // Go through the given source file infos and see what needs to be copied for _, fi := range finfos { currentFile++ // Check if we have a directory or a file if fi.IsDir() { if _, ok := dirMap[fi.Name()]; !ok { // Create all directories which aren't there if updFunc != nil { updFunc(SyncCreateDirectory, "", path.Join(ddir, fi.Name()), 0, 0, currentFile, totalFiles) } _, err = t.ItemOp(ddir, map[string]string{ ItemOpAction: ItemOpActMkDir, ItemOpName: fi.Name(), }) } // Remove existing directories from the map so we can // use the map to remove directories which shouldn't // be there delete(dirMap, fi.Name()) } else { fsum, ok := fileMap[fi.Name()] if !ok || fsum != fi.(*FileInfo).Checksum() { var u func(b int) s := path.Join(sdir, fi.Name()) d := path.Join(ddir, fi.Name()) // Copy the file if it does not exist or the checksum // is not matching if updFunc != nil { var totalTransferred, totalSize int64 totalSize = fi.Size() u = func(b int) { if b >= 0 { totalTransferred += int64(b) updFunc(SyncCopyFile, s, d, totalTransferred, totalSize, currentFile, totalFiles) } else { updFunc(SyncCopyFile, s, d, int64(b), totalSize, currentFile, totalFiles) } } } if err = t.CopyFile(s, d, u); err != nil && updFunc != nil { // Note at which point the error message was produced updFunc(SyncCopyFile, s, d, 0, fi.Size(), currentFile, totalFiles) } } // Remove existing files from the map so we can // use the map to remove files which shouldn't // be there delete(fileMap, fi.Name()) } if err != nil { break } } if err == nil { // Remove files and directories which are in the destination but // not in the source for d := range dirMap { if err == nil { if updFunc != nil { p := path.Join(ddir, d) updFunc(SyncRemoveDirectory, "", p, 0, 0, currentFile, totalFiles) } _, err = t.ItemOp(ddir, map[string]string{ ItemOpAction: ItemOpActDelete, ItemOpName: d, }) } } for f := range fileMap { if err == nil { if updFunc != nil { p := path.Join(ddir, f) updFunc(SyncRemoveFile, "", p, 0, 0, currentFile, totalFiles) } _, err = t.ItemOp(ddir, map[string]string{ ItemOpAction: ItemOpActDelete, ItemOpName: f, }) } } } } return err } // We only query the source once otherwise we might end up in an // endless loop if for example the dstDir is a subdirectory of srcDir srcDirs, srcFis, err := t.Dir(srcDir, "", recursive, true) if err == nil { for _, fis := range srcFis { totalFiles += int64(len(fis)) } for i, dir := range srcDirs { if err = doSync(relPath(dir, srcDir), srcFis[i]); err != nil { break } } } return err } /* CopyFile copies a given file using a simple io.Pipe. */ func (t *Tree) CopyFile(srcPath, dstPath string, updFunc func(writtenBytes int)) error { var pw io.WriteCloser var err, rerr error t.treeLock.RLock() defer t.treeLock.RUnlock() // Use a pipe to stream the contents of the source file to the destination file pr, pw := io.Pipe() if updFunc != nil { // Wrap the writer of the pipe pw = &statusUpdatingWriter{pw, updFunc} } // Make sure the src exists if _, rerr = t.ReadFile(srcPath, []byte{}, 0); rerr == nil { // Read the source in a go routine go func() { rerr = t.ReadFileToBuffer(srcPath, pw) pw.Close() }() // Write the destination file - this will return once the // writer is closed err = t.WriteFileFromBuffer(dstPath, pr) } if rerr != nil { // Check if we got an empty file if IsEOF(rerr) { _, err = t.WriteFile(dstPath, nil, 0) updFunc(0) // Report the creation of the empty file rerr = nil } else { // Read errors are reported before write errors err = rerr } } pr.Close() return err } /* ReadFileToBuffer reads a complete file into a given buffer which implements io.Writer. */ func (t *Tree) ReadFileToBuffer(spath string, buf io.Writer) error { var n int var err error var offset int64 readBuf := make([]byte, DefaultReadBufferSize) for err == nil { n, err = t.ReadFile(spath, readBuf, offset) if err == nil { _, err = buf.Write(readBuf[:n]) offset += int64(n) } else if IsEOF(err) { // We reached the end of the file err = nil break } } return err } /* ReadFile reads up to len(p) bytes into p from the given offset. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. */ func (t *Tree) ReadFile(spath string, p []byte, offset int64) (int, error) { var err error var n int var success bool t.treeLock.RLock() defer t.treeLock.RUnlock() err = &node.Error{ Type: node.ErrRemoteAction, Detail: os.ErrNotExist.Error(), IsNotExist: true, } dir, file := path.Split(spath) t.root.findPathBranches(dir, createMappingPath(dir), false, func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) { for _, b := range branches { if !success { // Only try other branches if we didn't have a success before var res []byte rpath := path.Join(branchPath...) rpath = path.Join(rpath, file) if res, err = t.client.SendData(b, map[string]string{ ParamAction: OpRead, ParamPath: rpath, ParamOffset: fmt.Sprint(offset), ParamSize: fmt.Sprint(len(p)), }, nil); err == nil { var dest []interface{} // Unpack the result if err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&dest); err == nil { n = dest[0].(int) buf := dest[1].([]byte) copy(p, buf) } } success = err == nil // Special case EOF if IsEOF(err) { success = true } } } }) return n, err } /* WriteFileFromBuffer writes a complete file from a given buffer which implements io.Reader. */ func (t *Tree) WriteFileFromBuffer(spath string, buf io.Reader) error { var err error var offset int64 writeBuf := make([]byte, DefaultReadBufferSize) for err == nil { var n int if n, err = buf.Read(writeBuf); err == nil { _, err = t.WriteFile(spath, writeBuf[:n], offset) offset += int64(n) } else if IsEOF(err) { // We reached the end of the file t.WriteFile(spath, []byte{}, offset) err = nil break } } return err } /* WriteFile writes p into the given file from the given offset. It returns the number of written bytes and any error encountered. */ func (t *Tree) WriteFile(spath string, p []byte, offset int64) (int, error) { var err error var n, totalCount, ignoreCount int t.treeLock.RLock() defer t.treeLock.RUnlock() dir, file := path.Split(spath) t.root.findPathBranches(dir, createMappingPath(dir), false, func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) { for i, b := range branches { var res []byte if err == nil { totalCount++ if !writable[i] { // Ignore all non-writable branches ignoreCount++ continue } rpath := path.Join(branchPath...) rpath = path.Join(rpath, file) if res, err = t.client.SendData(b, map[string]string{ ParamAction: OpWrite, ParamPath: rpath, ParamOffset: fmt.Sprint(offset), }, p); err == nil { err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&n) } } } }) if err == nil && totalCount == ignoreCount { err = fmt.Errorf("All applicable branches for the requested path were mounted as not writable") } return n, err } /* ItemOp executes a file or directory specific operation which can either succeed or fail (e.g. rename or delete). Actions and parameters should be given in the opdata map. */ func (t *Tree) ItemOp(dir string, opdata map[string]string) (bool, error) { var err error var ret, recurse bool var totalCount, ignoreCount, notFoundCount int t.treeLock.RLock() defer t.treeLock.RUnlock() data := make(map[string]string) for k, v := range opdata { data[k] = v } data[ParamAction] = OpItemOp // Check if we should recurse if r, ok := data[ItemOpName]; ok { recurse = strings.HasSuffix(r, "**") } t.root.findPathBranches(dir, createMappingPath(dir), recurse, func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) { for i, b := range branches { var res []byte totalCount++ if !writable[i] { // Ignore all non-writable branches ignoreCount++ continue } if err == nil { data[ParamPath] = path.Join(branchPath...) res, err = t.client.SendData(b, data, nil) if rerr, ok := err.(*node.Error); ok && rerr.IsNotExist { // Only count the not exist errors as this might only // be true for some branches notFoundCount++ err = nil } else if err == nil { var bres bool // Execute the OpItem function err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&bres) ret = ret || bres // One positive result is enough } } } }) if totalCount == ignoreCount { err = fmt.Errorf("All applicable branches for the requested path were mounted as not writable") } else if totalCount == notFoundCount+ignoreCount { err = &node.Error{ Type: node.ErrRemoteAction, Detail: os.ErrNotExist.Error(), IsNotExist: true, } } return ret, err } // Util functions // ============== /* createMappingPath properly splits a given path into a mapping path. */ func createMappingPath(path string) []string { var ret []string for _, i := range strings.Split(path, "/") { if i == "" { // Ignore empty child names continue } ret = append(ret, i) } return ret } /* DirResultToString formats a given Dir result into a human-readable string. */ func DirResultToString(paths []string, infos [][]os.FileInfo) string { var buf bytes.Buffer // Sort the paths sort.Sort(&dirResult{paths, infos}) // Sort the FileInfos within the paths for _, fis := range infos { sort.Sort(fileInfoSlice(fis)) } for i, p := range paths { var maxlen int fis := infos[i] buf.WriteString(p) buf.WriteString("\n") sizeStrings := make([]string, 0, len(fis)) for _, fi := range fis { sizeString := bitutil.ByteSizeString(fi.Size(), false) if strings.HasSuffix(sizeString, " B") { sizeString += " " // Unit should always be 3 runes } if l := utf8.RuneCountInString(sizeString); l > maxlen { maxlen = l } sizeStrings = append(sizeStrings, sizeString) } for j, fi := range fis { sizeString := sizeStrings[j] sizePrefix := stringutil.GenerateRollingString(" ", maxlen-utf8.RuneCountInString(sizeString)) if rfi, ok := fi.(*FileInfo); ok && rfi.FiChecksum != "" { buf.WriteString(fmt.Sprintf("%v %v%v %v [%s]\n", fi.Mode(), sizePrefix, sizeString, fi.Name(), rfi.Checksum())) } else { buf.WriteString(fmt.Sprintf("%v %v%v %v\n", fi.Mode(), sizePrefix, sizeString, fi.Name())) } } if i < len(paths)-1 { buf.WriteString("\n") } } return buf.String() } // Helper functions // ================ // Helper function to normalise relative paths /* relPath create a normalized relative path by removing a given path prefix. */ func relPath(path, prefix string) string { norm := func(path string) string { if !strings.HasPrefix(path, "/") { path = "/" + path } if strings.HasSuffix(path, "/") { path = path[:len(path)-1] } return path } path = norm(path) prefix = norm(prefix) if strings.HasPrefix(path, prefix) { path = path[len(prefix):] if path == "" { path = "/" } } return path } // Helper objects to sort dir results type dirResult struct { paths []string infos [][]os.FileInfo } func (r *dirResult) Len() int { return len(r.paths) } func (r *dirResult) Less(i, j int) bool { return r.paths[i] < r.paths[j] } func (r *dirResult) Swap(i, j int) { r.paths[i], r.paths[j] = r.paths[j], r.paths[i] r.infos[i], r.infos[j] = r.infos[j], r.infos[i] } type fileInfoSlice []os.FileInfo func (f fileInfoSlice) Len() int { return len(f) } func (f fileInfoSlice) Less(i, j int) bool { return f[i].Name() < f[j].Name() } func (f fileInfoSlice) Swap(i, j int) { f[i], f[j] = f[j], f[i] } // Helper object to given status updates when copying files /* statusUpdatingWriter is an internal io.WriteCloser which is used for status updates. */ type statusUpdatingWriter struct { io.WriteCloser statusUpdate func(writtenBytes int) } /* Write writes len(p) bytes from p to the writer. */ func (w *statusUpdatingWriter) Write(p []byte) (int, error) { n, err := w.WriteCloser.Write(p) w.statusUpdate(n) return n, err } /* Close closes the writer. */ func (w *statusUpdatingWriter) Close() error { w.statusUpdate(-1) return w.WriteCloser.Close() }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package rufs import ( "bytes" "fmt" "path" "sort" "devt.de/krotik/common/errorutil" "devt.de/krotik/common/stringutil" ) /* treeItem models an item in the tree. This is an internal data structure which is not exposed. */ type treeItem struct { children map[string]*treeItem // Mapping from path component to branch remoteBranches []string // List of remote branches which are present on this level remoteBranchWriting []bool // Flag if the remote branch should receive write requests } /* findPathBranches finds all relevant branches for a single path. The iterator function receives 4 parameters: The tree item, the total path within the tree, the subpath within the branch and a list of all branches for the tree path. Calling code should always give a treePath of "/". */ func (t *treeItem) findPathBranches(treePath string, branchPath []string, recursive bool, visit func(*treeItem, string, []string, []string, []bool)) { visit(t, treePath, branchPath, t.remoteBranches, t.remoteBranchWriting) if len(branchPath) > 0 { if c, ok := t.children[branchPath[0]]; ok { // Check if a subpath matches c.findPathBranches(path.Join(treePath, branchPath[0]), branchPath[1:], recursive, visit) } } else if recursive { var childNames []string for n := range t.children { childNames = append(childNames, n) } sort.Strings(childNames) for _, n := range childNames { t.children[n].findPathBranches(path.Join(treePath, n), branchPath, recursive, visit) } } } /* addMapping adds a new mapping. */ func (t *treeItem) addMapping(mappingPath []string, branchName string, writable bool) { // Add mapping to a child if len(mappingPath) > 0 { childName := mappingPath[0] rest := mappingPath[1:] errorutil.AssertTrue(childName != "", "Adding a mapping with an empty path is not supported") // Ensure child exists child, ok := t.children[childName] if !ok { child = &treeItem{make(map[string]*treeItem), []string{}, []bool{}} t.children[childName] = child } // Add rest of the mapping to the child child.addMapping(rest, branchName, writable) return } // Add branch name to this branch - keep the order in which the branches were added t.remoteBranches = append(t.remoteBranches, branchName) t.remoteBranchWriting = append(t.remoteBranchWriting, writable) } /* String returns a string representation of this item and its children. */ func (t *treeItem) String(indent int, buf *bytes.Buffer) { for i, b := range t.remoteBranches { buf.WriteString(b) if t.remoteBranchWriting[i] { buf.WriteString("(w)") } else { buf.WriteString("(r)") } if i < len(t.remoteBranches)-1 { buf.WriteString(", ") } } buf.WriteString("\n") names := make([]string, 0, len(t.children)) for n := range t.children { names = append(names, n) } sort.Strings(names) for _, n := range names { i := t.children[n] buf.WriteString(stringutil.GenerateRollingString(" ", indent*2)) buf.WriteString(fmt.Sprintf("%v/: ", n)) i.String(indent+1, buf) } }
/* * Rufs - Remote Union File System * * Copyright 2017 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package rufs import ( "io" "devt.de/krotik/rufs/node" ) /* IsEOF tests if the given error is an EOF error. */ func IsEOF(err error) bool { if err == io.EOF { return true } if rerr, ok := err.(*node.Error); ok { return rerr.Detail == io.EOF.Error() } return false }